Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request metadata updates from peers #1700

Merged
merged 6 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,7 @@ where
from: GenerationalNodeId,
request: AttachRequest,
) -> Result<(), ShutdownError> {
let partition_table = self
.metadata
.partition_table()
.expect("partition table is loaded before run");
let partition_table = self.metadata.partition_table_ref();
let networking = self.networking.clone();
let response = self.create_attachment_response(&partition_table, from, request.request_id);
self.task_center.spawn(
Expand Down Expand Up @@ -379,9 +376,7 @@ async fn signal_all_partitions_started(
)
.await?;
} else {
let partition_table = metadata
.partition_table()
.expect("valid partition table must be present");
let partition_table = metadata.partition_table_ref();

let mut pending_partitions_wo_leader = partition_table.num_partitions();

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

googletest = { workspace = true }
test-log = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
190 changes: 142 additions & 48 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,51 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use arc_swap::{ArcSwap, ArcSwapOption};
use arc_swap::ArcSwap;
use enum_map::EnumMap;
use std::ops::Deref;
use std::sync::Arc;

use strum::IntoEnumIterator;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::MissedTickBehavior;
use tracing::{debug, info, trace, warn};

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkError, NetworkSender};
use crate::task_center;
use restate_types::config::Configuration;
use restate_types::logs::metadata::Logs;
use restate_types::metadata_store::keys::{
BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEMA_INFORMATION_KEY,
};
use restate_types::net::metadata::{MetadataMessage, MetadataUpdate};
use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage, MetadataUpdate};
use restate_types::net::MessageEnvelope;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::FixedPartitionTable;
use restate_types::schema::Schema;
use restate_types::GenerationalNodeId;
use restate_types::{GenerationalNodeId, NodeId};
use restate_types::{Version, Versioned};

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;

use super::MetadataBuilder;
use super::{Metadata, MetadataContainer, MetadataKind, MetadataWriter};
use super::{MetadataBuilder, VersionInformation};

pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

#[derive(Debug, thiserror::Error)]
pub enum SyncError {}

#[derive(Debug, thiserror::Error)]
enum UpdateError {
#[error("failed reading metadata from the metadata store: {0}")]
MetadataStore(#[from] ReadError),
#[error(transparent)]
Network(#[from] NetworkError),
}

#[derive(Debug)]
pub enum TargetVersion {
Latest,
Expand All @@ -69,7 +79,7 @@ pub(super) enum Command {
SyncMetadata(
MetadataKind,
TargetVersion,
oneshot::Sender<Result<(), ReadError>>,
Option<oneshot::Sender<Result<(), ReadError>>>,
),
}

Expand Down Expand Up @@ -98,9 +108,7 @@ where
MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version),
MetadataKind::PartitionTable => self.send_partition_table(peer, min_version),
MetadataKind::Logs => self.send_logs(peer, min_version),
_ => {
todo!("Can't send metadata '{}' to peer", metadata_kind)
}
MetadataKind::Schema => self.send_schema(peer, min_version),
};
}

Expand All @@ -110,9 +118,8 @@ where
}

fn send_partition_table(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(partition_table) = self.metadata.partition_table() {
self.send_metadata_internal(to, version, partition_table.deref(), "partition_table");
}
let partition_table = self.metadata.partition_table_snapshot();
self.send_metadata_internal(to, version, partition_table.deref(), "partition_table");
}

fn send_logs(&self, to: GenerationalNodeId, version: Option<Version>) {
Expand All @@ -122,6 +129,13 @@ where
}
}

fn send_schema(&self, to: GenerationalNodeId, version: Option<Version>) {
let schema = self.metadata.schema();
if schema.version != Version::INVALID {
self.send_metadata_internal(to, version, schema.deref(), "schema");
}
}

fn send_metadata_internal<T>(
&self,
to: GenerationalNodeId,
Expand Down Expand Up @@ -228,6 +242,7 @@ pub struct MetadataManager<N> {
inbound: CommandReceiver,
networking: N,
metadata_store_client: MetadataStoreClient,
update_tasks: EnumMap<MetadataKind, Option<UpdateTask>>,
}

impl<N> MetadataManager<N>
Expand All @@ -244,6 +259,7 @@ where
inbound: metadata_builder.receiver,
networking,
metadata_store_client,
update_tasks: EnumMap::default(),
}
}

Expand All @@ -267,6 +283,13 @@ where
pub async fn run(mut self) -> anyhow::Result<()> {
debug!("Metadata manager started");

let update_interval = Configuration::pinned()
.common
.metadata_update_interval
.into();
let mut update_interval = tokio::time::interval(update_interval);
update_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
tokio::select! {
biased;
Expand All @@ -277,6 +300,11 @@ where
Some(cmd) = self.inbound.recv() => {
self.handle_command(cmd).await;
}
_ = update_interval.tick() => {
if let Err(err) = self.check_for_observed_updates().await {
warn!("Failed checking for metadata updates: {err}");
}
}
}
}
Ok(())
Expand All @@ -287,8 +315,10 @@ where
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SyncMetadata(kind, target_version, callback) => {
let result = self.sync_metadata(kind, target_version).await;
if callback.send(result).is_err() {
trace!("Couldn't send synce metadata reply back. System is probably shutting down.");
if let Some(callback) = callback {
if callback.send(result).is_err() {
trace!("Couldn't send sync metadata reply back. System is probably shutting down.");
}
}
}
}
Expand Down Expand Up @@ -389,26 +419,26 @@ where
fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
self.update_task_and_notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}

fn update_partition_table(&mut self, partition_table: FixedPartitionTable) {
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.partition_table, partition_table);
Self::update_internal(&self.metadata.inner.partition_table, partition_table);

self.notify_watches(maybe_new_version, MetadataKind::PartitionTable);
self.update_task_and_notify_watches(maybe_new_version, MetadataKind::PartitionTable);
}

fn update_logs(&mut self, logs: Logs) {
let maybe_new_version = Self::update_internal(&self.metadata.inner.logs, logs);

self.notify_watches(maybe_new_version, MetadataKind::Logs);
self.update_task_and_notify_watches(maybe_new_version, MetadataKind::Logs);
}

fn update_schema(&mut self, schema: Schema) {
let maybe_new_version = Self::update_internal(&self.metadata.inner.schema, schema);

self.notify_watches(maybe_new_version, MetadataKind::Schema);
self.update_task_and_notify_watches(maybe_new_version, MetadataKind::Schema);
}

fn update_internal<T: Versioned>(container: &ArcSwap<T>, new_value: T) -> Version {
Expand All @@ -430,31 +460,15 @@ where
maybe_new_version
}

fn update_option_internal<T: Versioned>(container: &ArcSwapOption<T>, new_value: T) -> Version {
let current_value = container.load();
let mut maybe_new_version = new_value.version();
match current_value.as_deref() {
None => {
container.store(Some(Arc::new(new_value)));
}
Some(current_value) if new_value.version() > current_value.version() => {
container.store(Some(Arc::new(new_value)));
}
Some(current_value) => {
/* Do nothing, current is already newer */
debug!(
"Ignoring update {} because we are at {}",
new_value.version(),
current_value.version(),
);
maybe_new_version = current_value.version();
}
fn update_task_and_notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) {
// update tasks if they are no longer needed
if self.update_tasks[kind]
.as_ref()
.is_some_and(|task| maybe_new_version >= task.version)
{
self.update_tasks[kind] = None;
}

maybe_new_version
}

fn notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) {
// notify watches.
self.metadata.inner.write_watches[kind]
.sender
Expand All @@ -467,6 +481,86 @@ where
}
});
}

async fn check_for_observed_updates(&mut self) -> Result<(), UpdateError> {
for kind in MetadataKind::iter() {
if let Some(version_information) = self.metadata.observed_version(kind) {
if version_information.version
> self.update_tasks[kind]
.as_ref()
.map(|task| task.version)
.unwrap_or(Version::INVALID)
{
self.update_tasks[kind] = Some(UpdateTask::from(version_information));
}
}
}

for metadata_kind in MetadataKind::iter() {
let mut update_task = self.update_tasks[metadata_kind].take();

if let Some(mut task) = update_task {
match task.state {
UpdateTaskState::FromRemoteNode(node_id) => {
debug!(
"Send GetMetadataRequest to {} from {}",
node_id,
self.metadata.my_node_id()
);
// todo: Move to dedicated task if this is blocking the MetadataManager too much
self.networking
.send(
node_id,
&MetadataMessage::GetMetadataRequest(GetMetadataRequest {
metadata_kind,
min_version: Some(task.version),
}),
)
.await?;
// on the next tick try to sync if no update was received
task.state = UpdateTaskState::Sync;
update_task = Some(task);
}
UpdateTaskState::Sync => {
// todo: Move to dedicated task if this is blocking the MetadataManager too much
self.sync_metadata(metadata_kind, TargetVersion::Version(task.version))
.await?;
// syncing will give us >= task.version so let's stop here
update_task = None;
}
}

self.update_tasks[metadata_kind] = update_task;
}
}

Ok(())
}
}

enum UpdateTaskState {
FromRemoteNode(NodeId),
Sync,
}

struct UpdateTask {
version: Version,
state: UpdateTaskState,
}

impl UpdateTask {
fn from(version_information: VersionInformation) -> Self {
let state = if let Some(node_id) = version_information.remote_node {
UpdateTaskState::FromRemoteNode(node_id)
} else {
UpdateTaskState::Sync
};

Self {
version: version_information.version,
state,
}
}
}

#[cfg(test)]
Expand Down
Loading
Loading