Skip to content

Commit

Permalink
Sync nodes configuration if node from the future connects
Browse files Browse the repository at this point in the history
This commit makes the server sync the nodes configuration if a remote
node from the future tries to connect to it.

This fixes #1696.
  • Loading branch information
tillrohrmann committed Jul 19, 2024
1 parent b210cfc commit 2e1e0a2
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 47 deletions.
11 changes: 9 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use restate_types::partition_table::{FixedPartitionTable, KeyRange};

use restate_bifrost::Bifrost;
use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskCenter, TaskKind};
use restate_core::{
cancellation_watcher, Metadata, ShutdownError, TargetVersion, TaskCenter, TaskKind,
};
use restate_types::cluster::cluster_state::RunMode;
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -370,7 +372,12 @@ async fn signal_all_partitions_started(
|| metadata.partition_table_version() == Version::INVALID
{
// syncing of PartitionTable since we obviously don't have up-to-date information
metadata.sync(MetadataKind::PartitionTable).await?;
metadata
.sync(
MetadataKind::PartitionTable,
TargetVersion::Version(cluster_state.partition_table_version.max(Version::MIN)),
)
.await?;
} else {
let partition_table = metadata
.partition_table()
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use enum_map::EnumMap;
use smallvec::SmallVec;
use tracing::instrument;

use restate_core::{Metadata, MetadataKind};
use restate_core::{Metadata, MetadataKind, TargetVersion};
use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment};
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
Expand Down Expand Up @@ -377,7 +377,7 @@ impl BifrostInner {
pub async fn sync_metadata(&self) -> Result<()> {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs)
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await
.map_err(Arc::new)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod worker_api;

pub use metadata::{
spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager,
MetadataWriter, SyncError,
MetadataWriter, SyncError, TargetVersion,
};
pub use task_center::*;
pub use task_center_types::*;
Expand Down
61 changes: 57 additions & 4 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,34 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;
#[derive(Debug, thiserror::Error)]
pub enum SyncError {}

#[derive(Debug)]
pub enum TargetVersion {
Latest,
Version(Version),
}

impl Default for TargetVersion {
fn default() -> Self {
Self::Latest
}
}

impl From<Option<Version>> for TargetVersion {
fn from(value: Option<Version>) -> Self {
match value {
Some(version) => TargetVersion::Version(version),
None => TargetVersion::Latest,
}
}
}

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
SyncMetadata(MetadataKind, oneshot::Sender<Result<(), ReadError>>),
SyncMetadata(
MetadataKind,
TargetVersion,
oneshot::Sender<Result<(), ReadError>>,
),
}

/// A handler for processing network messages targeting metadata manager
Expand Down Expand Up @@ -260,8 +285,8 @@ where
async fn handle_command(&mut self, cmd: Command) {
match cmd {
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SyncMetadata(kind, callback) => {
let result = self.sync_metadata(kind).await;
Command::SyncMetadata(kind, target_version, callback) => {
let result = self.sync_metadata(kind, target_version).await;
if callback.send(result).is_err() {
trace!("Couldn't send synce metadata reply back. System is probably shutting down.");
}
Expand Down Expand Up @@ -290,7 +315,15 @@ where
}
}

async fn sync_metadata(&mut self, metadata_kind: MetadataKind) -> Result<(), ReadError> {
async fn sync_metadata(
&mut self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> Result<(), ReadError> {
if self.has_target_version(metadata_kind, target_version) {
return Ok(());
}

match metadata_kind {
MetadataKind::NodesConfiguration => {
if let Some(nodes_config) = self
Expand Down Expand Up @@ -333,6 +366,26 @@ where
Ok(())
}

fn has_target_version(
&self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> bool {
match target_version {
TargetVersion::Latest => false,
TargetVersion::Version(target_version) => {
let version = match metadata_kind {
MetadataKind::NodesConfiguration => self.metadata.nodes_config_version(),
MetadataKind::Schema => self.metadata.schema_version(),
MetadataKind::PartitionTable => self.metadata.partition_table_version(),
MetadataKind::Logs => self.metadata.logs_version(),
};

version >= target_version
}
}
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

Expand Down
20 changes: 16 additions & 4 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#![allow(dead_code)]

mod manager;
pub use manager::MetadataManager;
pub use manager::{MetadataManager, TargetVersion};
use restate_types::live::{Live, Pinned};
use restate_types::schema::Schema;

Expand Down Expand Up @@ -175,11 +175,23 @@ impl Metadata {
self.inner.write_watches[metadata_kind].receive.clone()
}

/// Syncs the given metadata_kind from the underlying metadata store.
pub async fn sync(&self, metadata_kind: MetadataKind) -> Result<(), SyncError> {
/// Syncs the given metadata_kind from the underlying metadata store if the current version is
/// lower than target version.
///
/// Note: If the target version does not exist, then a lower version will be available after
/// this call completes.
pub async fn sync(
&self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> Result<(), SyncError> {
let (result_tx, result_rx) = oneshot::channel();
self.sender
.send(Command::SyncMetadata(metadata_kind, result_tx))
.send(Command::SyncMetadata(
metadata_kind,
target_version,
result_tx,
))
.map_err(|_| ShutdownError)?;
result_rx.await.map_err(|_| ShutdownError)??;

Expand Down
75 changes: 46 additions & 29 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tracing::{debug, info, trace, warn, Instrument, Span};

use restate_types::live::Pinned;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::message::{self, ConnectionControl};
use restate_types::protobuf::node::{Header, Hello, Message, Welcome};
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};
Expand All @@ -36,8 +39,8 @@ use super::metric_definitions::{
};
use super::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::{Handler, MessageRouter};
use crate::Metadata;
use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};
use crate::{Metadata, TargetVersion};

// todo: make this configurable
const SEND_QUEUE_SIZE: usize = 1;
Expand Down Expand Up @@ -136,7 +139,7 @@ impl ConnectionManager {
// The client can retry with an exponential backoff on handshake timeout.
debug!("Accepting incoming connection");
let (header, hello) = wait_for_hello(&mut incoming).await?;
let nodes_config = self.metadata.nodes_config_snapshot();
let nodes_config = self.metadata.nodes_config_ref();
let my_node_id = self.metadata.my_node_id();
// NodeId **must** be generational at this layer
let peer_node_id = hello
Expand Down Expand Up @@ -176,33 +179,9 @@ impl ConnectionManager {
selected_protocol_version
);

// If nodeId is unrecognized and peer is at higher nodes configuration version,
// TODO: issue a sync to the higher version
let peer_is_in_the_future = header
.my_nodes_config_version
.as_ref()
.is_some_and(|v| v.value > nodes_config.version().into());

if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
if peer_is_in_the_future {
info!(
"Rejecting a connection from an unrecognized node v{}, the peer is at a higher \
nodes configuration version {:?}, mine is {}",
peer_node_id,
header.my_nodes_config_version,
nodes_config.version()
);
// TODO: notify metadata about higher version.
// let _ = self
// .metadata
// .notify_observed_version(
// MetadataKind::NodesConfiguration,
// header.my_nodes_config_version.unwrap().into(),
// )
// .await?;
}
return Err(NetworkError::UnknownNode(e));
}
let nodes_config = self
.verify_node_id(peer_node_id, header, nodes_config)
.await?;

let (tx, rx) = mpsc::channel(SEND_QUEUE_SIZE);
// Enqueue the welcome message
Expand Down Expand Up @@ -230,6 +209,44 @@ impl ConnectionManager {
Ok(Box::pin(transformed))
}

async fn verify_node_id(
&self,
peer_node_id: GenerationalNodeId,
header: Header,
mut nodes_config: Pinned<NodesConfiguration>,
) -> Result<Pinned<NodesConfiguration>, NetworkError> {
if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
// If nodeId is unrecognized and peer is at higher nodes configuration version,
// then we have to update our NodesConfiguration
let peer_is_in_the_future = header
.my_nodes_config_version
.as_ref()
.is_some_and(|v| v.value > nodes_config.version().into());

if peer_is_in_the_future {
// don't keep pinned nodes configuration beyond await point
drop(nodes_config);
// todo: Replace with notifying metadata manager about newer version
self.metadata
.sync(
MetadataKind::NodesConfiguration,
TargetVersion::from(header.my_nodes_config_version.clone().map(Into::into)),
)
.await?;
nodes_config = self.metadata.nodes_config_ref();

if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
warn!("Could not find remote node {} after syncing nodes configuration. Local version '{}', remote version '{:?}'.", peer_node_id, nodes_config.version(), header.my_nodes_config_version.expect("must be present"));
return Err(NetworkError::UnknownNode(e));
}
} else {
return Err(NetworkError::UnknownNode(e));
}
}

Ok(nodes_config)
}

/// Always attempts to create a new connection with peer
pub async fn enforced_new_node_sender(
&self,
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use restate_types::net::{CodecError, MIN_SUPPORTED_PROTOCOL_VERSION};
use restate_types::nodes_config::NodesConfigError;
use restate_types::NodeId;

use crate::ShutdownError;
use crate::{ShutdownError, SyncError};

#[derive(Debug, thiserror::Error)]
pub enum RouterError {
Expand Down Expand Up @@ -42,6 +42,8 @@ pub enum NetworkError {
ConnectionClosed,
#[error("cannot send messages to this node: {0}")]
Unavailable(String),
#[error("failed syncing metadata: {0}")]
Metadata(#[from] SyncError),
}

#[derive(Debug, thiserror::Error)]
Expand Down
16 changes: 12 additions & 4 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use restate_bifrost::BifrostService;
use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError};
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager};
use restate_core::{
spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion,
};
use restate_core::{task_center, TaskKind};
use restate_metadata_store::local::LocalMetadataStoreService;
use restate_metadata_store::MetadataStoreClient;
Expand Down Expand Up @@ -296,8 +298,12 @@ impl Node {
metadata_writer.update(logs).await?;
} else {
// otherwise, just sync the required metadata
metadata.sync(MetadataKind::PartitionTable).await?;
metadata.sync(MetadataKind::Logs).await?;
metadata
.sync(MetadataKind::PartitionTable, TargetVersion::Latest)
.await?;
metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await?;

// safety check until we can tolerate missing partition table and logs configuration
if metadata.partition_table_version() == Version::INVALID
Expand All @@ -312,7 +318,9 @@ impl Node {
}

// fetch the latest schema information
metadata.sync(MetadataKind::Schema).await?;
metadata
.sync(MetadataKind::Schema, TargetVersion::Latest)
.await?;

let nodes_config = metadata.nodes_config_ref();

Expand Down

0 comments on commit 2e1e0a2

Please sign in to comment.