diff --git a/Cargo.lock b/Cargo.lock index f17ceb2cf..e69c234c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5627,6 +5627,7 @@ dependencies = [ "static_assertions", "strum 0.26.2", "strum_macros 0.26.2", + "test-log", "thiserror", "tokio", "tokio-stream", diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 4ec6347da..6b14c6af9 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -318,10 +318,7 @@ where from: GenerationalNodeId, request: AttachRequest, ) -> Result<(), ShutdownError> { - let partition_table = self - .metadata - .partition_table() - .expect("partition table is loaded before run"); + let partition_table = self.metadata.partition_table_ref(); let networking = self.networking.clone(); let response = self.create_attachment_response(&partition_table, from, request.request_id); self.task_center.spawn( @@ -379,9 +376,7 @@ async fn signal_all_partitions_started( ) .await?; } else { - let partition_table = metadata - .partition_table() - .expect("valid partition table must be present"); + let partition_table = metadata.partition_table_ref(); let mut pending_partitions_wo_leader = partition_table.num_partitions(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index b0cdd1216..172819e50 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -59,6 +59,7 @@ restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } googletest = { workspace = true } +test-log = { workspace = true } tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } tracing-test = { workspace = true } diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index c3d0998ea..07f35dde3 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,34 +8,36 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use arc_swap::{ArcSwap, ArcSwapOption}; +use arc_swap::ArcSwap; +use enum_map::EnumMap; use std::ops::Deref; use std::sync::Arc; - +use strum::IntoEnumIterator; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio::time::MissedTickBehavior; use tracing::{debug, info, trace, warn}; +use crate::cancellation_watcher; +use crate::is_cancellation_requested; +use crate::metadata_store::{MetadataStoreClient, ReadError}; +use crate::network::{MessageHandler, MessageRouterBuilder, NetworkError, NetworkSender}; +use crate::task_center; +use restate_types::config::Configuration; use restate_types::logs::metadata::Logs; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEMA_INFORMATION_KEY, }; -use restate_types::net::metadata::{MetadataMessage, MetadataUpdate}; +use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage, MetadataUpdate}; use restate_types::net::MessageEnvelope; use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::FixedPartitionTable; use restate_types::schema::Schema; -use restate_types::GenerationalNodeId; +use restate_types::{GenerationalNodeId, NodeId}; use restate_types::{Version, Versioned}; -use crate::cancellation_watcher; -use crate::is_cancellation_requested; -use crate::metadata_store::{MetadataStoreClient, ReadError}; -use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender}; -use crate::task_center; - -use super::MetadataBuilder; use super::{Metadata, MetadataContainer, MetadataKind, MetadataWriter}; +use super::{MetadataBuilder, VersionInformation}; pub(super) type CommandSender = mpsc::UnboundedSender; pub(super) type CommandReceiver = mpsc::UnboundedReceiver; @@ -43,6 +45,14 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver; #[derive(Debug, thiserror::Error)] pub enum SyncError {} +#[derive(Debug, thiserror::Error)] +enum UpdateError { + #[error("failed reading metadata from the metadata store: {0}")] + MetadataStore(#[from] ReadError), + #[error(transparent)] + Network(#[from] NetworkError), +} + #[derive(Debug)] pub enum TargetVersion { Latest, @@ -69,7 +79,7 @@ pub(super) enum Command { SyncMetadata( MetadataKind, TargetVersion, - oneshot::Sender>, + Option>>, ), } @@ -98,9 +108,7 @@ where MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version), MetadataKind::PartitionTable => self.send_partition_table(peer, min_version), MetadataKind::Logs => self.send_logs(peer, min_version), - _ => { - todo!("Can't send metadata '{}' to peer", metadata_kind) - } + MetadataKind::Schema => self.send_schema(peer, min_version), }; } @@ -110,9 +118,8 @@ where } fn send_partition_table(&self, to: GenerationalNodeId, version: Option) { - if let Some(partition_table) = self.metadata.partition_table() { - self.send_metadata_internal(to, version, partition_table.deref(), "partition_table"); - } + let partition_table = self.metadata.partition_table_snapshot(); + self.send_metadata_internal(to, version, partition_table.deref(), "partition_table"); } fn send_logs(&self, to: GenerationalNodeId, version: Option) { @@ -122,6 +129,13 @@ where } } + fn send_schema(&self, to: GenerationalNodeId, version: Option) { + let schema = self.metadata.schema(); + if schema.version != Version::INVALID { + self.send_metadata_internal(to, version, schema.deref(), "schema"); + } + } + fn send_metadata_internal( &self, to: GenerationalNodeId, @@ -228,6 +242,7 @@ pub struct MetadataManager { inbound: CommandReceiver, networking: N, metadata_store_client: MetadataStoreClient, + update_tasks: EnumMap>, } impl MetadataManager @@ -244,6 +259,7 @@ where inbound: metadata_builder.receiver, networking, metadata_store_client, + update_tasks: EnumMap::default(), } } @@ -267,6 +283,13 @@ where pub async fn run(mut self) -> anyhow::Result<()> { debug!("Metadata manager started"); + let update_interval = Configuration::pinned() + .common + .metadata_update_interval + .into(); + let mut update_interval = tokio::time::interval(update_interval); + update_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { tokio::select! { biased; @@ -277,6 +300,11 @@ where Some(cmd) = self.inbound.recv() => { self.handle_command(cmd).await; } + _ = update_interval.tick() => { + if let Err(err) = self.check_for_observed_updates().await { + warn!("Failed checking for metadata updates: {err}"); + } + } } } Ok(()) @@ -287,8 +315,10 @@ where Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback), Command::SyncMetadata(kind, target_version, callback) => { let result = self.sync_metadata(kind, target_version).await; - if callback.send(result).is_err() { - trace!("Couldn't send synce metadata reply back. System is probably shutting down."); + if let Some(callback) = callback { + if callback.send(result).is_err() { + trace!("Couldn't send sync metadata reply back. System is probably shutting down."); + } } } } @@ -389,26 +419,26 @@ where fn update_nodes_configuration(&mut self, config: NodesConfiguration) { let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config); - self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration); + self.update_task_and_notify_watches(maybe_new_version, MetadataKind::NodesConfiguration); } fn update_partition_table(&mut self, partition_table: FixedPartitionTable) { let maybe_new_version = - Self::update_option_internal(&self.metadata.inner.partition_table, partition_table); + Self::update_internal(&self.metadata.inner.partition_table, partition_table); - self.notify_watches(maybe_new_version, MetadataKind::PartitionTable); + self.update_task_and_notify_watches(maybe_new_version, MetadataKind::PartitionTable); } fn update_logs(&mut self, logs: Logs) { let maybe_new_version = Self::update_internal(&self.metadata.inner.logs, logs); - self.notify_watches(maybe_new_version, MetadataKind::Logs); + self.update_task_and_notify_watches(maybe_new_version, MetadataKind::Logs); } fn update_schema(&mut self, schema: Schema) { let maybe_new_version = Self::update_internal(&self.metadata.inner.schema, schema); - self.notify_watches(maybe_new_version, MetadataKind::Schema); + self.update_task_and_notify_watches(maybe_new_version, MetadataKind::Schema); } fn update_internal(container: &ArcSwap, new_value: T) -> Version { @@ -430,31 +460,15 @@ where maybe_new_version } - fn update_option_internal(container: &ArcSwapOption, new_value: T) -> Version { - let current_value = container.load(); - let mut maybe_new_version = new_value.version(); - match current_value.as_deref() { - None => { - container.store(Some(Arc::new(new_value))); - } - Some(current_value) if new_value.version() > current_value.version() => { - container.store(Some(Arc::new(new_value))); - } - Some(current_value) => { - /* Do nothing, current is already newer */ - debug!( - "Ignoring update {} because we are at {}", - new_value.version(), - current_value.version(), - ); - maybe_new_version = current_value.version(); - } + fn update_task_and_notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) { + // update tasks if they are no longer needed + if self.update_tasks[kind] + .as_ref() + .is_some_and(|task| maybe_new_version >= task.version) + { + self.update_tasks[kind] = None; } - maybe_new_version - } - - fn notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) { // notify watches. self.metadata.inner.write_watches[kind] .sender @@ -467,6 +481,86 @@ where } }); } + + async fn check_for_observed_updates(&mut self) -> Result<(), UpdateError> { + for kind in MetadataKind::iter() { + if let Some(version_information) = self.metadata.observed_version(kind) { + if version_information.version + > self.update_tasks[kind] + .as_ref() + .map(|task| task.version) + .unwrap_or(Version::INVALID) + { + self.update_tasks[kind] = Some(UpdateTask::from(version_information)); + } + } + } + + for metadata_kind in MetadataKind::iter() { + let mut update_task = self.update_tasks[metadata_kind].take(); + + if let Some(mut task) = update_task { + match task.state { + UpdateTaskState::FromRemoteNode(node_id) => { + debug!( + "Send GetMetadataRequest to {} from {}", + node_id, + self.metadata.my_node_id() + ); + // todo: Move to dedicated task if this is blocking the MetadataManager too much + self.networking + .send( + node_id, + &MetadataMessage::GetMetadataRequest(GetMetadataRequest { + metadata_kind, + min_version: Some(task.version), + }), + ) + .await?; + // on the next tick try to sync if no update was received + task.state = UpdateTaskState::Sync; + update_task = Some(task); + } + UpdateTaskState::Sync => { + // todo: Move to dedicated task if this is blocking the MetadataManager too much + self.sync_metadata(metadata_kind, TargetVersion::Version(task.version)) + .await?; + // syncing will give us >= task.version so let's stop here + update_task = None; + } + } + + self.update_tasks[metadata_kind] = update_task; + } + } + + Ok(()) + } +} + +enum UpdateTaskState { + FromRemoteNode(NodeId), + Sync, +} + +struct UpdateTask { + version: Version, + state: UpdateTaskState, +} + +impl UpdateTask { + fn from(version_information: VersionInformation) -> Self { + let state = if let Some(node_id) = version_information.remote_node { + UpdateTaskState::FromRemoteNode(node_id) + } else { + UpdateTaskState::Sync + }; + + Self { + version: version_information.version, + state, + } + } } #[cfg(test)] diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index 7b3666977..23f74f8bf 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -12,13 +12,14 @@ #![allow(dead_code)] mod manager; + pub use manager::{MetadataManager, TargetVersion}; use restate_types::live::{Live, Pinned}; use restate_types::schema::Schema; use std::sync::{Arc, OnceLock}; -use arc_swap::{ArcSwap, ArcSwapOption}; +use arc_swap::{ArcSwap, AsRaw}; use enum_map::EnumMap; use tokio::sync::{mpsc, oneshot, watch}; @@ -27,7 +28,7 @@ use restate_types::net::metadata::MetadataContainer; pub use restate_types::net::metadata::MetadataKind; use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::FixedPartitionTable; -use restate_types::{GenerationalNodeId, Version, Versioned}; +use restate_types::{GenerationalNodeId, NodeId, Version, Versioned}; use crate::metadata::manager::Command; use crate::metadata_store::ReadError; @@ -98,16 +99,30 @@ impl Metadata { self.inner.nodes_config.load().version() } - pub fn partition_table(&self) -> Option> { + pub fn partition_table_snapshot(&self) -> Arc { self.inner.partition_table.load_full() } + #[inline(always)] + pub fn partition_table_ref(&self) -> Pinned { + Pinned::new(&self.inner.partition_table) + } + + pub fn updateable_partition_table(&self) -> Live { + Live::from(self.inner.partition_table.clone()) + } + /// Returns Version::INVALID if partition table has not been loaded yet. pub fn partition_table_version(&self) -> Version { - let c = self.inner.partition_table.load(); - match c.as_deref() { - Some(c) => c.version(), - None => Version::INVALID, + self.inner.partition_table.load().version() + } + + pub fn version(&self, metadata_kind: MetadataKind) -> Version { + match metadata_kind { + MetadataKind::NodesConfiguration => self.nodes_config_version(), + MetadataKind::Schema => self.schema_version(), + MetadataKind::PartitionTable => self.partition_table_version(), + MetadataKind::Logs => self.logs_version(), } } @@ -116,15 +131,14 @@ impl Metadata { &self, min_version: Version, ) -> Result, ShutdownError> { - if let Some(partition_table) = self.partition_table() { - if partition_table.version() >= min_version { - return Ok(partition_table); - } + let partition_table = self.partition_table_ref(); + if partition_table.version() >= min_version { + return Ok(partition_table.into_arc()); } self.wait_for_version(MetadataKind::PartitionTable, min_version) .await?; - Ok(self.partition_table().unwrap()) + Ok(self.partition_table_snapshot()) } pub fn logs(&self) -> Pinned { @@ -136,8 +150,8 @@ impl Metadata { self.inner.logs.load().version() } - pub fn schema(&self) -> Arc { - self.inner.schema.load_full() + pub fn schema(&self) -> Pinned { + Pinned::new(&self.inner.schema) } pub fn schema_version(&self) -> Version { @@ -190,23 +204,90 @@ impl Metadata { .send(Command::SyncMetadata( metadata_kind, target_version, - result_tx, + Some(result_tx), )) .map_err(|_| ShutdownError)?; result_rx.await.map_err(|_| ShutdownError)??; Ok(()) } + + /// Notifies the metadata manager about a newly observed metadata version for the given kind. + /// If the metadata can be retrieved from a node, then the [`NodeId`] can be included as well. + pub fn notify_observed_version( + &self, + metadata_kind: MetadataKind, + version: Version, + remote_location: Option, + urgency: Urgency, + ) { + // check whether the version is newer than what we know + if version > self.version(metadata_kind) { + match urgency { + Urgency::High => { + // send should only fail in case of shut down + let _ = self.sender.send(Command::SyncMetadata( + metadata_kind, + TargetVersion::Version(version), + None, + )); + } + Urgency::Normal => { + let mut guard = self.inner.observed_versions[metadata_kind].load(); + + // check whether it is even newer than the latest observed version + if version > guard.version { + // Create the arc outside of loop to avoid reallocations in case of contention; + // maybe this is guarding too much against the contended case. + let new_version_information = + Arc::new(VersionInformation::new(version, remote_location)); + + // maybe a simple Arc> works better? Needs a benchmark. + loop { + let cas_guard = self.inner.observed_versions[metadata_kind] + .compare_and_swap(&guard, Arc::clone(&new_version_information)); + + if std::ptr::eq(cas_guard.as_raw(), guard.as_raw()) { + break; + } + + guard = cas_guard; + + // stop trying to update the observed value if a newer one was reported before + if guard.version >= version { + break; + } + } + } + } + } + } + } + + /// Returns the [`VersionInformation`] for the metadata kind if a newer version than the local + /// version has been observed. + fn observed_version(&self, metadata_kind: MetadataKind) -> Option { + let guard = self.inner.observed_versions[metadata_kind].load(); + + if guard.version > self.version(metadata_kind) { + Some((**guard).clone()) + } else { + None + } + } } #[derive(Default)] struct MetadataInner { my_node_id: OnceLock, nodes_config: Arc>, - partition_table: ArcSwapOption, + partition_table: Arc>, logs: Arc>, schema: Arc>, write_watches: EnumMap, + // might be subject to false sharing if independent sources want to update different metadata + // kinds concurrently. + observed_versions: EnumMap>, } /// Can send updates to metadata manager. This should be accessible by the rpc handler layer to @@ -285,3 +366,36 @@ where metadata_manager.run(), ) } + +#[derive(Debug, Clone)] +struct VersionInformation { + version: Version, + remote_node: Option, +} + +impl Default for VersionInformation { + fn default() -> Self { + Self { + version: Version::INVALID, + remote_node: None, + } + } +} + +impl VersionInformation { + fn new(version: Version, remote_location: Option) -> Self { + Self { + version, + remote_node: remote_location, + } + } +} + +/// Defines how urgent it is to react to observed metadata versions. +#[derive(Debug)] +pub enum Urgency { + /// Immediately sync data from the metadata store + High, + /// Try to fetch metadata from a remote node if available on the next update interval + Normal, +} diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index fdaeff6ef..fbc719ed9 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -8,27 +8,35 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use enum_map::EnumMap; +use std::ops::Index; use std::sync::Arc; use std::sync::Weak; use std::time::Instant; - use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tracing::instrument; +use super::metric_definitions::CONNECTION_SEND_DURATION; +use super::metric_definitions::MESSAGE_SENT; +use super::NetworkError; +use super::ProtocolError; +use crate::network::connection_manager::MetadataVersions; +use crate::Metadata; use restate_types::live::Live; +use restate_types::logs::metadata::Logs; use restate_types::net::codec::Targeted; use restate_types::net::codec::{serialize_message, WireEncode}; +use restate_types::net::metadata::MetadataKind; use restate_types::net::ProtocolVersion; use restate_types::nodes_config::NodesConfiguration; +use restate_types::partition_table::FixedPartitionTable; use restate_types::protobuf::node::message; +use restate_types::protobuf::node::message::Body; use restate_types::protobuf::node::Header; use restate_types::protobuf::node::Message; -use restate_types::GenerationalNodeId; - -use super::metric_definitions::CONNECTION_SEND_DURATION; -use super::metric_definitions::MESSAGE_SENT; -use super::NetworkError; -use super::ProtocolError; +use restate_types::schema::Schema; +use restate_types::{GenerationalNodeId, Version, Versioned}; /// A single streaming connection with a channel to the peer. A connection can be /// opened by either ends of the connection and has no direction. Any connection @@ -43,8 +51,7 @@ pub(crate) struct Connection { pub(crate) peer: GenerationalNodeId, pub(crate) protocol_version: ProtocolVersion, pub(crate) sender: mpsc::Sender, - pub(crate) created: std::time::Instant, - updateable_nodes_config: Live, + pub(crate) created: Instant, } impl Connection { @@ -52,15 +59,13 @@ impl Connection { peer: GenerationalNodeId, protocol_version: ProtocolVersion, sender: mpsc::Sender, - updateable_nodes_config: Live, ) -> Self { Self { cid: rand::random(), peer, protocol_version, sender, - created: std::time::Instant::now(), - updateable_nodes_config, + created: Instant::now(), } } @@ -80,33 +85,17 @@ impl Connection { /// A handle that sends messages through that connection. This hides the /// wire protocol from the user and guarantees order of messages. - pub fn sender(self: &Arc) -> ConnectionSender { + pub fn sender(self: &Arc, metadata: &Metadata) -> ConnectionSender { ConnectionSender { - peer: self.peer, connection: Arc::downgrade(self), - protocol_version: self.protocol_version, - nodes_config: self.updateable_nodes_config.clone(), + nodes_config: metadata.updateable_nodes_config(), + schema: metadata.updateable_schema(), + logs: metadata.updateable_logs_metadata(), + partition_table: metadata.updateable_partition_table(), + metadata_versions: MetadataVersions::default(), } } -} - -impl PartialEq for Connection { - fn eq(&self, other: &Self) -> bool { - self.cid == other.cid && self.peer == other.peer - } -} - -/// A handle to send messages through a connection. It's safe and cheap to hold -/// and clone objects of this even if the connection has been dropped. -#[derive(Clone)] -pub struct ConnectionSender { - peer: GenerationalNodeId, - connection: Weak, - protocol_version: ProtocolVersion, - nodes_config: Live, -} -impl ConnectionSender { /// Send a message on this connection. This returns Ok(()) when the message is: /// - Successfully serialized to the wire format based on the negotiated protocol /// - Serialized message was enqueued on the send buffer of the socket @@ -122,26 +111,149 @@ impl ConnectionSender { /// This doesn't auto-retry connection resets or send errors, this is up to the user /// for retrying externally. #[instrument(skip_all, fields(peer_node_id = %self.peer, target_service = ?message.target(), msg = ?message.kind()))] - pub async fn send(&mut self, message: M) -> Result<(), NetworkError> + pub async fn send( + &self, + message: M, + metadata_versions: HeaderMetadataVersions, + ) -> Result<(), NetworkError> where M: WireEncode + Targeted, { let send_start = Instant::now(); - let header = Header::new(self.nodes_config.live_load().version()); - let body = - serialize_message(message, self.protocol_version).map_err(ProtocolError::Codec)?; + let (header, body) = self.create_message(message, metadata_versions)?; let res = self - .connection - .upgrade() - .ok_or(NetworkError::ConnectionClosed)? .sender .send(Message::new(header, body)) .await .map_err(|_| NetworkError::ConnectionClosed); - MESSAGE_SENT.increment(1); - CONNECTION_SEND_DURATION.record(send_start.elapsed()); + + if res.is_ok() { + MESSAGE_SENT.increment(1); + CONNECTION_SEND_DURATION.record(send_start.elapsed()); + } + + res + } + + fn create_message( + &self, + message: M, + metadata_versions: HeaderMetadataVersions, + ) -> Result<(Header, Body), NetworkError> + where + M: WireEncode + Targeted, + { + let header = Header::new( + metadata_versions[MetadataKind::NodesConfiguration] + .expect("nodes configuration version must be set"), + metadata_versions[MetadataKind::Logs], + metadata_versions[MetadataKind::Schema], + metadata_versions[MetadataKind::PartitionTable], + ); + let body = + serialize_message(message, self.protocol_version).map_err(ProtocolError::Codec)?; + Ok((header, body)) + } + + /// Tries sending a message on this connection. If there is no capacity, it will fail. Apart + /// from this, the method behaves similarly to [`Connection::send`]. + #[instrument(skip_all, fields(peer_node_id = %self.peer, target_service = ?message.target(), msg = ?message.kind()))] + pub fn try_send( + &self, + message: M, + metadata_versions: HeaderMetadataVersions, + ) -> Result<(), NetworkError> + where + M: WireEncode + Targeted, + { + let send_start = Instant::now(); + let (header, body) = self.create_message(message, metadata_versions)?; + let res = self + .sender + .try_send(Message::new(header, body)) + .map_err(|err| match err { + TrySendError::Full(_) => NetworkError::Full, + TrySendError::Closed(_) => NetworkError::ConnectionClosed, + }); + + if res.is_ok() { + MESSAGE_SENT.increment(1); + CONNECTION_SEND_DURATION.record(send_start.elapsed()); + } + res } } +struct HeaderMetadataVersions { + versions: EnumMap>, +} + +impl Index for HeaderMetadataVersions { + type Output = Option; + + fn index(&self, index: MetadataKind) -> &Self::Output { + &self.versions[index] + } +} + +impl PartialEq for Connection { + fn eq(&self, other: &Self) -> bool { + self.cid == other.cid && self.peer == other.peer + } +} + +/// A handle to send messages through a connection. It's safe to hold and clone objects of this +/// even if the connection has been dropped. Cloning and holding comes at the cost of caching +/// all existing metadata which is not for free. +#[derive(Clone)] +pub struct ConnectionSender { + connection: Weak, + nodes_config: Live, + schema: Live, + logs: Live, + partition_table: Live, + metadata_versions: MetadataVersions, +} + +impl ConnectionSender { + /// See [`Connection::send`]. + pub async fn send(&mut self, message: M) -> Result<(), NetworkError> + where + M: WireEncode + Targeted, + { + self.connection + .upgrade() + .ok_or(NetworkError::ConnectionClosed)? + .send(message, self.header_metadata_versions()) + .await + } + + /// See [`Connection::try_send`]. + pub fn try_send(&mut self, message: M) -> Result<(), NetworkError> + where + M: WireEncode + Targeted, + { + self.connection + .upgrade() + .ok_or(NetworkError::ConnectionClosed)? + .try_send(message, self.header_metadata_versions()) + } + + fn header_metadata_versions(&mut self) -> HeaderMetadataVersions { + let mut version_updates = self.metadata_versions.update( + None, + Some(self.partition_table.live_load().version()), + Some(self.schema.live_load().version()), + Some(self.logs.live_load().version()), + ); + version_updates[MetadataKind::NodesConfiguration] = + Some(self.nodes_config.live_load().version()); + + HeaderMetadataVersions { + versions: version_updates, + } + } +} + static_assertions::assert_impl_all!(ConnectionSender: Send, Sync); diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 426b81919..ba2389195 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -1,4 +1,4 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// Copyright (c) 2024-2024 - Restate Software, Inc., Restate GmbH. // All rights reserved. // // Use of this software is governed by the Business Source License @@ -8,26 +8,26 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{hash_map, HashMap}; -use std::sync::{Arc, Mutex, Weak}; -use std::time::Instant; - +use enum_map::EnumMap; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use rand::seq::SliceRandom; use restate_types::net::codec::try_unwrap_binary_message; +use std::collections::{hash_map, HashMap}; +use std::ops::{Index, IndexMut}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tracing::{debug, info, trace, warn, Instrument, Span}; -use restate_types::live::Pinned; use restate_types::net::metadata::MetadataKind; use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::NodesConfiguration; use restate_types::protobuf::node::message::{self, ConnectionControl}; use restate_types::protobuf::node::{Header, Hello, Message, Welcome}; -use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; +use restate_types::{GenerationalNodeId, NodeId, PlainNodeId, Version}; use super::connection::{Connection, ConnectionSender}; use super::error::{NetworkError, ProtocolError}; @@ -38,9 +38,10 @@ use super::metric_definitions::{ }; use super::protobuf::node_svc::node_svc_client::NodeSvcClient; use super::{Handler, MessageRouter}; +use crate::metadata::Urgency; use crate::network::net_util::create_tonic_channel_from_advertised_address; +use crate::Metadata; use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind}; -use crate::{Metadata, TargetVersion}; // todo: make this configurable const SEND_QUEUE_SIZE: usize = 1; @@ -79,11 +80,11 @@ impl Default for ConnectionManagerInner { fn default() -> Self { metric_definitions::describe_metrics(); Self { - router: Default::default(), - connections: Default::default(), - connection_by_gen_id: Default::default(), - observed_generations: Default::default(), - channel_cache: Default::default(), + router: MessageRouter::default(), + connections: HashMap::default(), + connection_by_gen_id: HashMap::default(), + observed_generations: HashMap::default(), + channel_cache: HashMap::default(), } } } @@ -95,11 +96,11 @@ pub struct ConnectionManager { } impl ConnectionManager { + /// Creates the connection manager. pub(super) fn new(metadata: Metadata) -> Self { - Self { - metadata, - inner: Arc::new(Mutex::new(ConnectionManagerInner::default())), - } + let inner = Arc::new(Mutex::new(ConnectionManagerInner::default())); + + Self { metadata, inner } } /// Updates the message router. Note that this only impacts new connections. /// In general, this should be called once on application start after @@ -176,26 +177,22 @@ impl ConnectionManager { selected_protocol_version ); - let nodes_config = self - .verify_node_id(peer_node_id, header, nodes_config) - .await?; + self.verify_node_id(peer_node_id, header, &nodes_config)?; let (tx, rx) = mpsc::channel(SEND_QUEUE_SIZE); // Enqueue the welcome message let welcome = Welcome::new(my_node_id, selected_protocol_version); - let welcome = Message::new(Header::new(nodes_config.version()), welcome); + let welcome = Message::new( + Header::new(nodes_config.version(), None, None, None), + welcome, + ); tx.try_send(welcome) .expect("channel accept Welcome message"); INCOMING_CONNECTION.increment(1); - let connection = Connection::new( - peer_node_id, - selected_protocol_version, - tx, - self.metadata.updateable_nodes_config(), - ); + let connection = Connection::new(peer_node_id, selected_protocol_version, tx); // Register the connection. let _ = self.start_connection_reactor(connection, incoming)?; // For uniformity with outbound connections, we map all responses to Ok, we never rely on @@ -206,42 +203,38 @@ impl ConnectionManager { Ok(Box::pin(transformed)) } - async fn verify_node_id( + fn verify_node_id( &self, peer_node_id: GenerationalNodeId, header: Header, - mut nodes_config: Pinned, - ) -> Result, NetworkError> { + nodes_config: &NodesConfiguration, + ) -> Result<(), NetworkError> { if let Err(e) = nodes_config.find_node_by_id(peer_node_id) { // If nodeId is unrecognized and peer is at higher nodes configuration version, // then we have to update our NodesConfiguration - let peer_is_in_the_future = header - .my_nodes_config_version - .as_ref() - .is_some_and(|v| v.value > nodes_config.version().into()); - - if peer_is_in_the_future { - // don't keep pinned nodes configuration beyond await point - drop(nodes_config); - // todo: Replace with notifying metadata manager about newer version - self.metadata - .sync( - MetadataKind::NodesConfiguration, - TargetVersion::from(header.my_nodes_config_version.map(Into::into)), - ) - .await?; - nodes_config = self.metadata.nodes_config_ref(); + if let Some(other_nodes_config_version) = header.my_nodes_config_version.map(Into::into) + { + let peer_is_in_the_future = other_nodes_config_version > nodes_config.version(); - if let Err(e) = nodes_config.find_node_by_id(peer_node_id) { - warn!("Could not find remote node {} after syncing nodes configuration. Local version '{}', remote version '{:?}'.", peer_node_id, nodes_config.version(), header.my_nodes_config_version.expect("must be present")); - return Err(NetworkError::UnknownNode(e)); + if peer_is_in_the_future { + self.metadata.notify_observed_version( + MetadataKind::NodesConfiguration, + other_nodes_config_version, + None, + Urgency::High, + ); + debug!("Remote node '{}' with newer nodes configuration '{}' tried to connect. Trying to fetch newer version before accepting connection.", peer_node_id, other_nodes_config_version); + } else { + info!("Unknown remote node '{}' tried to connect to cluster. Rejecting connection.", peer_node_id); } } else { - return Err(NetworkError::UnknownNode(e)); + info!("Unknown remote node '{}' w/o specifying its node configuration tried to connect to cluster. Rejecting connection.", peer_node_id); } + + return Err(NetworkError::UnknownNode(e)); } - Ok(nodes_config) + Ok(()) } /// Always attempts to create a new connection with peer @@ -250,7 +243,7 @@ impl ConnectionManager { node_id: GenerationalNodeId, ) -> Result { let connection = self.connect(node_id).await?; - Ok(connection.sender()) + Ok(connection.sender(&self.metadata)) } /// Gets an existing connection or creates a new one if no active connection exists. If @@ -267,12 +260,12 @@ impl ConnectionManager { }; if let Some(connection) = maybe_connection { - return Ok(connection.sender()); + return Ok(connection.sender(&self.metadata)); } // We have no connection, or the connection we picked is stale. We attempt to create a // new connection anyway. let connection = self.connect(node_id).await?; - Ok(connection.sender()) + Ok(connection.sender(&self.metadata)) } async fn connect(&self, node_id: GenerationalNodeId) -> Result, NetworkError> { @@ -308,12 +301,7 @@ impl ConnectionManager { node_id: GenerationalNodeId, ) -> Result, NetworkError> { let (tx, rx) = mpsc::channel(SEND_QUEUE_SIZE); - let connection = Connection::new( - node_id, - restate_types::net::CURRENT_PROTOCOL_VERSION, - tx, - self.metadata.updateable_nodes_config(), - ); + let connection = Connection::new(node_id, restate_types::net::CURRENT_PROTOCOL_VERSION, tx); let transformed = ReceiverStream::new(rx).map(Ok); let incoming = Box::pin(transformed); @@ -328,7 +316,6 @@ impl ConnectionManager { channel: Channel, ) -> Result, NetworkError> { let mut client = NodeSvcClient::new(channel); - let nodes_config_version = self.metadata.nodes_config_version(); let cluster_name = self.metadata.nodes_config_ref().cluster_name().to_owned(); let my_node_id = self.metadata.my_node_id(); @@ -336,7 +323,10 @@ impl ConnectionManager { let hello = Hello::new(my_node_id, cluster_name); // perform handshake. - let hello = Message::new(Header::new(nodes_config_version), hello); + let hello = Message::new( + Header::new(self.metadata.nodes_config_version(), None, None, None), + hello, + ); // Prime the channel with the hello message before connecting. tx.send(hello).await.expect("Channel accept hello message"); @@ -381,7 +371,6 @@ impl ConnectionManager { .expect("must be generational id"), protocol_version, tx, - self.metadata.updateable_nodes_config(), ); self.start_connection_reactor(connection, transformed) @@ -440,7 +429,14 @@ impl ConnectionManager { TaskKind::ConnectionReactor, "network-connection-reactor", None, - run_reactor(self.inner.clone(), connection.clone(), router, incoming).instrument(span), + run_reactor( + self.inner.clone(), + connection.clone(), + router, + incoming, + self.metadata.clone(), + ) + .instrument(span), )?; debug!( peer_node_id = %peer_node_id, @@ -466,6 +462,7 @@ async fn run_reactor( connection: Arc, router: MessageRouter, mut incoming: S, + metadata: Metadata, ) -> anyhow::Result<()> where S: Stream> + Unpin + Send, @@ -475,6 +472,8 @@ where tracing::field::display(current_task_id().unwrap()), ); let mut cancellation = std::pin::pin!(cancellation_watcher()); + let mut seen_versions = MetadataVersions::default(); + // Receive loop loop { // read a message from the stream @@ -503,8 +502,25 @@ where MESSAGE_RECEIVED.increment(1); let processing_started = Instant::now(); // header is optional on non-hello messages. - if let Some(_header) = msg.header { - // todo: if header contains newer config or metadata versions, notify metadata(). + if let Some(header) = msg.header { + seen_versions + .update( + header.my_nodes_config_version.map(Into::into), + header.my_partition_table_version.map(Into::into), + header.my_schema_version.map(Into::into), + header.my_logs_version.map(Into::into), + ) + .into_iter() + .for_each(|(kind, version)| { + if let Some(version) = version { + metadata.notify_observed_version( + kind, + version, + Some(NodeId::from(connection.peer)), + Urgency::Normal, + ); + } + }) }; // body are not allowed to be empty. @@ -623,21 +639,88 @@ fn on_connection_terminated(inner_manager: &Mutex) { guard.drop_connection(task_id); } +#[derive(Debug, Clone, PartialEq)] +pub struct MetadataVersions { + versions: EnumMap, +} + +impl Default for MetadataVersions { + fn default() -> Self { + Self { + versions: EnumMap::from_fn(|_| Version::INVALID), + } + } +} + +impl MetadataVersions { + pub fn update( + &mut self, + nodes_config_version: Option, + partition_table_version: Option, + schema_version: Option, + logs_version: Option, + ) -> EnumMap> { + let mut result = EnumMap::default(); + result[MetadataKind::NodesConfiguration] = + self.update_internal(MetadataKind::NodesConfiguration, nodes_config_version); + result[MetadataKind::PartitionTable] = + self.update_internal(MetadataKind::PartitionTable, partition_table_version); + result[MetadataKind::Schema] = self.update_internal(MetadataKind::Schema, schema_version); + result[MetadataKind::Logs] = self.update_internal(MetadataKind::Logs, logs_version); + + result + } + + fn update_internal( + &mut self, + metadata_kind: MetadataKind, + version: Option, + ) -> Option { + if let Some(version) = version { + if version > self.versions[metadata_kind] { + self.versions[metadata_kind] = version; + return Some(version); + } + } + None + } +} + +impl Index for MetadataVersions { + type Output = Version; + + fn index(&self, index: MetadataKind) -> &Self::Output { + &self.versions[index] + } +} + +impl IndexMut for MetadataVersions { + fn index_mut(&mut self, index: MetadataKind) -> &mut Self::Output { + &mut self.versions[index] + } +} + #[cfg(test)] mod tests { use crate::network::handshake::HANDSHAKE_TIMEOUT; use super::*; + use crate::{MetadataBuilder, MockNetworkSender, TestCoreEnv, TestCoreEnvBuilder}; use googletest::prelude::*; - - use crate::TestCoreEnv; - use restate_test_util::assert_eq; + use restate_test_util::{assert_eq, let_assert}; + use restate_types::net::codec::{serialize_message, Targeted, WireDecode, WireEncode}; + use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage}; + use restate_types::net::partition_processor_manager::GetProcessorsState; use restate_types::net::{ - ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION, + ProtocolVersion, RequestId, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION, }; - use restate_types::nodes_config::NodesConfigError; + use restate_types::nodes_config::{NodeConfig, NodesConfigError, Role}; use restate_types::protobuf::node::message; + use restate_types::protobuf::node::message::Body; + use restate_types::Version; + use test_log::test; + use tonic::Status; // Test handshake with a client #[tokio::test] @@ -647,33 +730,10 @@ mod tests { .tc .run_in_scope("test", None, async { let metadata = crate::metadata(); - let (tx, rx) = mpsc::channel(1); let connections = ConnectionManager::new(metadata.clone()); - let hello = Hello::new( - metadata.my_node_id(), - metadata.nodes_config_ref().cluster_name().to_owned(), - ); - let hello = Message::new(Header::new(metadata.nodes_config_version()), hello); - tx.send(Ok(hello)) - .await - .expect("Channel accept hello message"); + let _ = establish_connection(metadata.my_node_id(), &metadata, &connections).await; - let incoming = ReceiverStream::new(rx); - let mut output_stream = connections - .accept_incoming_connection(incoming) - .await - .expect("handshake"); - let msg = output_stream - .next() - .await - .expect("welcome message") - .expect("ok"); - let welcome = match msg.body { - Some(message::Body::Welcome(welcome)) => welcome, - _ => panic!("unexpected message"), - }; - assert_eq!(welcome.my_node_id, Some(metadata.my_node_id().into())); Ok(()) }) .await @@ -722,7 +782,10 @@ mod tests { my_node_id: Some(my_node_id.into()), cluster_name: metadata.nodes_config_ref().cluster_name().to_owned(), }; - let hello = Message::new(Header::new(metadata.nodes_config_version()), hello); + let hello = Message::new( + Header::new(metadata.nodes_config_version(), None, None, None), + hello, + ); tx.send(Ok(hello)) .await .expect("Channel accept hello message"); @@ -747,7 +810,10 @@ mod tests { my_node_id: Some(my_node_id.into()), cluster_name: "Random-cluster".to_owned(), }; - let hello = Message::new(Header::new(metadata.nodes_config_version()), hello); + let hello = Message::new( + Header::new(metadata.nodes_config_version(), None, None, None), + hello, + ); tx.send(Ok(hello)).await?; let connections = ConnectionManager::new(metadata); @@ -785,7 +851,10 @@ mod tests { my_node_id, metadata.nodes_config_ref().cluster_name().to_owned(), ); - let hello = Message::new(Header::new(metadata.nodes_config_version()), hello); + let hello = Message::new( + Header::new(metadata.nodes_config_version(), None, None, None), + hello, + ); tx.send(Ok(hello)) .await .expect("Channel accept hello message"); @@ -798,7 +867,6 @@ mod tests { .await .err() .unwrap(); - println!("{:?}", err); assert!(matches!( err, @@ -815,7 +883,10 @@ mod tests { my_node_id, metadata.nodes_config_ref().cluster_name().to_owned(), ); - let hello = Message::new(Header::new(metadata.nodes_config_version()), hello); + let hello = Message::new( + Header::new(metadata.nodes_config_version(), None, None, None), + hello, + ); tx.send(Ok(hello)) .await .expect("Channel accept hello message"); @@ -836,4 +907,167 @@ mod tests { }) .await } + + #[test(tokio::test(start_paused = true))] + async fn fetching_metadata_updates_through_message_headers() -> Result<()> { + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + + let node_id = GenerationalNodeId::new(42, 42); + let node_config = NodeConfig::new( + "42".to_owned(), + node_id, + AdvertisedAddress::Uds("foobar1".into()), + Role::Worker.into(), + ); + nodes_config.upsert_node(node_config); + + let (network_tx, mut network_rx) = mpsc::unbounded_channel(); + let metadata_builder = MetadataBuilder::default(); + + let test_env = TestCoreEnvBuilder::new( + MockNetworkSender::from_sender(network_tx, metadata_builder.to_metadata()), + metadata_builder, + ) + .with_nodes_config(nodes_config) + .build() + .await; + + test_env + .tc + .run_in_scope("test", None, async { + let metadata = crate::metadata(); + let connections = ConnectionManager::new(metadata.clone()); + + let (connection, _rx) = + establish_connection(node_id, &metadata, &connections).await; + + let request = GetProcessorsState { + request_id: RequestId::default(), + }; + let partition_table_version = metadata.partition_table_version().next(); + let header = Header::new( + metadata.nodes_config_version(), + None, + None, + Some(partition_table_version), + ); + + connection.send(request, header).await?; + + let (target, message) = network_rx.recv().await.expect("some message"); + assert_eq!(NodeId::from(target), node_id); + assert_get_metadata_request( + message, + connection.protocol_version, + MetadataKind::PartitionTable, + partition_table_version, + ); + + Ok(()) + }) + .await + } + + fn assert_get_metadata_request( + message: Message, + protocol_version: ProtocolVersion, + metadata_kind: MetadataKind, + version: Version, + ) { + let metadata_message = + decode_metadata_message(message, protocol_version).expect("valid message"); + assert_that!( + metadata_message, + pat!(MetadataMessage::GetMetadataRequest(pat!( + GetMetadataRequest { + metadata_kind: eq(metadata_kind), + min_version: eq(Some(version)) + } + ))) + ); + } + + fn decode_metadata_message( + message: Message, + protocol_version: ProtocolVersion, + ) -> Result { + let_assert!(Some(Body::Encoded(mut binary_message)) = message.body); + + let metadata_message = + MetadataMessage::decode(&mut binary_message.payload, protocol_version)?; + Ok(metadata_message) + } + + async fn establish_connection( + node_id: GenerationalNodeId, + metadata: &Metadata, + connections: &ConnectionManager, + ) -> ( + TestConnection, + BoxStream<'static, std::result::Result>, + ) { + let (tx, rx) = mpsc::channel(1); + + let hello = Hello::new( + node_id, + metadata.nodes_config_ref().cluster_name().to_owned(), + ); + let hello = Message::new( + Header::new(metadata.nodes_config_version(), None, None, None), + hello, + ); + tx.send(Ok(hello)) + .await + .expect("Channel accept hello message"); + + let incoming = ReceiverStream::new(rx); + let mut output_stream = connections + .accept_incoming_connection(incoming) + .await + .expect("handshake"); + let msg = output_stream + .next() + .await + .expect("welcome message") + .expect("ok"); + let welcome = match msg.body { + Some(message::Body::Welcome(welcome)) => welcome, + _ => panic!("unexpected message"), + }; + assert_eq!(welcome.my_node_id, Some(metadata.my_node_id().into())); + + ( + TestConnection::new(welcome.protocol_version(), tx), + output_stream, + ) + } + + struct TestConnection { + protocol_version: ProtocolVersion, + tx: mpsc::Sender>, + } + + impl TestConnection { + fn new( + protocol_version: ProtocolVersion, + tx: mpsc::Sender>, + ) -> Self { + Self { + protocol_version, + tx, + } + } + + async fn send(&self, message: M, header: Header) -> Result<()> + where + M: WireEncode + Targeted, + { + let body = serialize_message(message, self.protocol_version)?; + let message = Message::new(header, body); + + self.tx.send(Ok(message)).await?; + + Ok(()) + } + } } diff --git a/crates/core/src/network/error.rs b/crates/core/src/network/error.rs index b4fb689ae..ae7877377 100644 --- a/crates/core/src/network/error.rs +++ b/crates/core/src/network/error.rs @@ -44,6 +44,8 @@ pub enum NetworkError { Unavailable(String), #[error("failed syncing metadata: {0}")] Metadata(#[from] SyncError), + #[error("network channel is full and sending would block")] + Full, } #[derive(Debug, thiserror::Error)] diff --git a/crates/core/src/test_env.rs b/crates/core/src/test_env.rs index 790b5ef07..91d08f83d 100644 --- a/crates/core/src/test_env.rs +++ b/crates/core/src/test_env.rs @@ -73,7 +73,8 @@ impl NetworkSender for MockNetworkSender { }, }; - let header = Header::new(metadata().nodes_config_version()); + let metadata = metadata(); + let header = Header::new(metadata.nodes_config_version(), None, None, None); let body = serialize_message(message, CURRENT_PROTOCOL_VERSION).map_err(ProtocolError::Codec)?; sender @@ -171,8 +172,7 @@ impl TestCoreEnvBuilder where N: NetworkSender + 'static, { - pub fn new(network_sender: N) -> Self { - let metadata_builder = MetadataBuilder::default(); + pub fn new(network_sender: N, metadata_builder: MetadataBuilder) -> Self { TestCoreEnvBuilder::new_with_network_tx_rx(network_sender, None, metadata_builder) } diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index 0fc0ff2c1..856c3ff24 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -321,8 +321,7 @@ mod tests { // Let's check we correct have generated a bifrost write let partition_id = node_env .metadata - .partition_table() - .unwrap() + .partition_table_snapshot() .find_partition_id(invocation_id.partition_key())?; let log_id = LogId::from(partition_id); let log_record = bifrost.read(log_id, Lsn::OLDEST).await?.unwrap(); @@ -415,8 +414,7 @@ mod tests { // Let's check the command was written to bifrost let partition_id = node_env .metadata - .partition_table() - .unwrap() + .partition_table_snapshot() .find_partition_id(invocation_id.partition_key())?; let bifrost_messages = bifrost.read_all(LogId::from(partition_id)).await?; diff --git a/crates/types/protobuf/restate/node.proto b/crates/types/protobuf/restate/node.proto index 5b960143c..57a42f427 100644 --- a/crates/types/protobuf/restate/node.proto +++ b/crates/types/protobuf/restate/node.proto @@ -17,7 +17,12 @@ package restate.node; // # Wire Protocol Of Streaming Connections // ------------------------------------- // -message Header { restate.common.Version my_nodes_config_version = 1; } +message Header { + restate.common.Version my_nodes_config_version = 1; + optional restate.common.Version my_logs_version = 2; + optional restate.common.Version my_schema_version = 3; + optional restate.common.Version my_partition_table_version = 4; +} // First message sent to an ingress after starting the connection. The message // must be sent before any other message. diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index f19e9bd88..7d890b1cc 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -8,15 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; -use std::path::PathBuf; -use std::str::FromStr; - use enumset::EnumSet; -use humantime::Duration; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; +use std::path::PathBuf; +use std::str::FromStr; use restate_serde_util::NonZeroByteCount; @@ -93,7 +91,7 @@ pub struct CommonOptions { /// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format. #[serde_as(as = "serde_with::DisplayFromStr")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub shutdown_timeout: Duration, + pub shutdown_timeout: humantime::Duration, /// # Default async runtime thread pool /// @@ -228,6 +226,14 @@ pub struct CommonOptions { /// RocksDb base settings and memory limits that get applied on every database #[serde(flatten)] pub rocksdb: RocksDbOptions, + + /// # Metadata update interval + /// + /// The interval at which each node checks for metadata updates it has observed from different + /// nodes or other sources. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub metadata_update_interval: humantime::Duration, } static HOSTNAME: Lazy = Lazy::new(|| { @@ -353,6 +359,7 @@ impl Default for CommonOptions { rocksdb_enable_stall_on_memory_limit: false, rocksdb_perf_level: PerfStatsLevel::EnableCount, rocksdb: Default::default(), + metadata_update_interval: std::time::Duration::from_secs(3).into(), } } } diff --git a/crates/types/src/net/metadata.rs b/crates/types/src/net/metadata.rs index acf2f3449..7cd5b2799 100644 --- a/crates/types/src/net/metadata.rs +++ b/crates/types/src/net/metadata.rs @@ -52,6 +52,7 @@ define_message! { Serialize, Deserialize, strum_macros::Display, + strum_macros::EnumCount, )] pub enum MetadataKind { NodesConfiguration, diff --git a/crates/types/src/partition_table.rs b/crates/types/src/partition_table.rs index 4a84c721f..4076dd11e 100644 --- a/crates/types/src/partition_table.rs +++ b/crates/types/src/partition_table.rs @@ -43,6 +43,15 @@ pub struct FixedPartitionTable { num_partitions: u64, } +impl Default for FixedPartitionTable { + fn default() -> Self { + Self { + version: Version::INVALID, + num_partitions: 0, + } + } +} + impl FixedPartitionTable { const PARTITION_KEY_RANGE_END: u128 = 1 << 64; diff --git a/crates/types/src/protobuf.rs b/crates/types/src/protobuf.rs index 851eeb4f5..963147792 100644 --- a/crates/types/src/protobuf.rs +++ b/crates/types/src/protobuf.rs @@ -81,9 +81,17 @@ pub mod node { } impl Header { - pub fn new(nodes_config_version: crate::Version) -> Self { + pub fn new( + nodes_config_version: crate::Version, + logs_version: Option, + schema_version: Option, + partition_table_version: Option, + ) -> Self { Self { my_nodes_config_version: Some(nodes_config_version.into()), + my_logs_version: logs_version.map(Into::into), + my_schema_version: schema_version.map(Into::into), + my_partition_table_version: partition_table_version.map(Into::into), } } } diff --git a/crates/worker/src/ingress_integration.rs b/crates/worker/src/ingress_integration.rs index 0585ed0a6..857b85742 100644 --- a/crates/worker/src/ingress_integration.rs +++ b/crates/worker/src/ingress_integration.rs @@ -42,8 +42,7 @@ impl InvocationStorageReaderImpl { impl InvocationStorageReader for InvocationStorageReaderImpl { async fn get_output(&self, query: InvocationQuery) -> Result { let partition_id = metadata() - .partition_table() - .ok_or_else(|| anyhow!("Can't find partition table"))? + .partition_table_ref() .find_partition_id(query.partition_key())?; let mut partition_storage = self .partition_store_manager