Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync nodes configuration if node from the future connects #1699

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use restate_types::partition_table::{FixedPartitionTable, KeyRange};

use restate_bifrost::Bifrost;
use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskCenter, TaskKind};
use restate_core::{
cancellation_watcher, Metadata, ShutdownError, TargetVersion, TaskCenter, TaskKind,
};
use restate_types::cluster::cluster_state::RunMode;
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -370,7 +372,12 @@ async fn signal_all_partitions_started(
|| metadata.partition_table_version() == Version::INVALID
{
// syncing of PartitionTable since we obviously don't have up-to-date information
metadata.sync(MetadataKind::PartitionTable).await?;
metadata
.sync(
MetadataKind::PartitionTable,
TargetVersion::Version(cluster_state.partition_table_version.max(Version::MIN)),
)
.await?;
} else {
let partition_table = metadata
.partition_table()
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use enum_map::EnumMap;
use smallvec::SmallVec;
use tracing::instrument;

use restate_core::{Metadata, MetadataKind};
use restate_core::{Metadata, MetadataKind, TargetVersion};
use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment};
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
Expand Down Expand Up @@ -377,7 +377,7 @@ impl BifrostInner {
pub async fn sync_metadata(&self) -> Result<()> {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs)
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await
.map_err(Arc::new)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod worker_api;

pub use metadata::{
spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager,
MetadataWriter, SyncError,
MetadataWriter, SyncError, TargetVersion,
};
pub use task_center::*;
pub use task_center_types::*;
Expand Down
61 changes: 57 additions & 4 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,34 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;
#[derive(Debug, thiserror::Error)]
pub enum SyncError {}

#[derive(Debug)]
pub enum TargetVersion {
Latest,
Version(Version),
}

impl Default for TargetVersion {
fn default() -> Self {
Self::Latest
}
}

impl From<Option<Version>> for TargetVersion {
fn from(value: Option<Version>) -> Self {
match value {
Some(version) => TargetVersion::Version(version),
None => TargetVersion::Latest,
}
}
}

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
SyncMetadata(MetadataKind, oneshot::Sender<Result<(), ReadError>>),
SyncMetadata(
MetadataKind,
TargetVersion,
oneshot::Sender<Result<(), ReadError>>,
),
}

/// A handler for processing network messages targeting metadata manager
Expand Down Expand Up @@ -260,8 +285,8 @@ where
async fn handle_command(&mut self, cmd: Command) {
match cmd {
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SyncMetadata(kind, callback) => {
let result = self.sync_metadata(kind).await;
Command::SyncMetadata(kind, target_version, callback) => {
let result = self.sync_metadata(kind, target_version).await;
if callback.send(result).is_err() {
trace!("Couldn't send synce metadata reply back. System is probably shutting down.");
}
Expand Down Expand Up @@ -290,7 +315,15 @@ where
}
}

async fn sync_metadata(&mut self, metadata_kind: MetadataKind) -> Result<(), ReadError> {
async fn sync_metadata(
&mut self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> Result<(), ReadError> {
if self.has_target_version(metadata_kind, target_version) {
return Ok(());
}

match metadata_kind {
MetadataKind::NodesConfiguration => {
if let Some(nodes_config) = self
Expand Down Expand Up @@ -333,6 +366,26 @@ where
Ok(())
}

fn has_target_version(
&self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> bool {
match target_version {
TargetVersion::Latest => false,
TargetVersion::Version(target_version) => {
let version = match metadata_kind {
MetadataKind::NodesConfiguration => self.metadata.nodes_config_version(),
MetadataKind::Schema => self.metadata.schema_version(),
MetadataKind::PartitionTable => self.metadata.partition_table_version(),
MetadataKind::Logs => self.metadata.logs_version(),
};

version >= target_version
}
}
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

Expand Down
20 changes: 16 additions & 4 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#![allow(dead_code)]

mod manager;
pub use manager::MetadataManager;
pub use manager::{MetadataManager, TargetVersion};
use restate_types::live::{Live, Pinned};
use restate_types::schema::Schema;

Expand Down Expand Up @@ -175,11 +175,23 @@ impl Metadata {
self.inner.write_watches[metadata_kind].receive.clone()
}

/// Syncs the given metadata_kind from the underlying metadata store.
pub async fn sync(&self, metadata_kind: MetadataKind) -> Result<(), SyncError> {
/// Syncs the given metadata_kind from the underlying metadata store if the current version is
/// lower than target version.
///
/// Note: If the target version does not exist, then a lower version will be available after
/// this call completes.
pub async fn sync(
&self,
metadata_kind: MetadataKind,
target_version: TargetVersion,
) -> Result<(), SyncError> {
let (result_tx, result_rx) = oneshot::channel();
self.sender
.send(Command::SyncMetadata(metadata_kind, result_tx))
.send(Command::SyncMetadata(
metadata_kind,
target_version,
result_tx,
))
.map_err(|_| ShutdownError)?;
result_rx.await.map_err(|_| ShutdownError)??;

Expand Down
75 changes: 46 additions & 29 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tracing::{debug, info, trace, warn, Instrument, Span};

use restate_types::live::Pinned;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::message::{self, ConnectionControl};
use restate_types::protobuf::node::{Header, Hello, Message, Welcome};
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};
Expand All @@ -36,8 +39,8 @@ use super::metric_definitions::{
};
use super::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::{Handler, MessageRouter};
use crate::Metadata;
use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};
use crate::{Metadata, TargetVersion};

// todo: make this configurable
const SEND_QUEUE_SIZE: usize = 1;
Expand Down Expand Up @@ -136,7 +139,7 @@ impl ConnectionManager {
// The client can retry with an exponential backoff on handshake timeout.
debug!("Accepting incoming connection");
let (header, hello) = wait_for_hello(&mut incoming).await?;
let nodes_config = self.metadata.nodes_config_snapshot();
let nodes_config = self.metadata.nodes_config_ref();
let my_node_id = self.metadata.my_node_id();
// NodeId **must** be generational at this layer
let peer_node_id = hello
Expand Down Expand Up @@ -176,33 +179,9 @@ impl ConnectionManager {
selected_protocol_version
);

// If nodeId is unrecognized and peer is at higher nodes configuration version,
// TODO: issue a sync to the higher version
let peer_is_in_the_future = header
.my_nodes_config_version
.as_ref()
.is_some_and(|v| v.value > nodes_config.version().into());

if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
if peer_is_in_the_future {
info!(
"Rejecting a connection from an unrecognized node v{}, the peer is at a higher \
nodes configuration version {:?}, mine is {}",
peer_node_id,
header.my_nodes_config_version,
nodes_config.version()
);
// TODO: notify metadata about higher version.
// let _ = self
// .metadata
// .notify_observed_version(
// MetadataKind::NodesConfiguration,
// header.my_nodes_config_version.unwrap().into(),
// )
// .await?;
}
return Err(NetworkError::UnknownNode(e));
}
let nodes_config = self
.verify_node_id(peer_node_id, header, nodes_config)
.await?;

let (tx, rx) = mpsc::channel(SEND_QUEUE_SIZE);
// Enqueue the welcome message
Expand Down Expand Up @@ -230,6 +209,44 @@ impl ConnectionManager {
Ok(Box::pin(transformed))
}

async fn verify_node_id(
&self,
peer_node_id: GenerationalNodeId,
header: Header,
mut nodes_config: Pinned<NodesConfiguration>,
) -> Result<Pinned<NodesConfiguration>, NetworkError> {
if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
// If nodeId is unrecognized and peer is at higher nodes configuration version,
// then we have to update our NodesConfiguration
let peer_is_in_the_future = header
.my_nodes_config_version
.as_ref()
.is_some_and(|v| v.value > nodes_config.version().into());

if peer_is_in_the_future {
// don't keep pinned nodes configuration beyond await point
drop(nodes_config);
// todo: Replace with notifying metadata manager about newer version
self.metadata
.sync(
MetadataKind::NodesConfiguration,
TargetVersion::from(header.my_nodes_config_version.clone().map(Into::into)),
)
.await?;
nodes_config = self.metadata.nodes_config_ref();

if let Err(e) = nodes_config.find_node_by_id(peer_node_id) {
warn!("Could not find remote node {} after syncing nodes configuration. Local version '{}', remote version '{:?}'.", peer_node_id, nodes_config.version(), header.my_nodes_config_version.expect("must be present"));
return Err(NetworkError::UnknownNode(e));
}
} else {
return Err(NetworkError::UnknownNode(e));
}
}

Ok(nodes_config)
}

/// Always attempts to create a new connection with peer
pub async fn enforced_new_node_sender(
&self,
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use restate_types::net::{CodecError, MIN_SUPPORTED_PROTOCOL_VERSION};
use restate_types::nodes_config::NodesConfigError;
use restate_types::NodeId;

use crate::ShutdownError;
use crate::{ShutdownError, SyncError};

#[derive(Debug, thiserror::Error)]
pub enum RouterError {
Expand Down Expand Up @@ -42,6 +42,8 @@ pub enum NetworkError {
ConnectionClosed,
#[error("cannot send messages to this node: {0}")]
Unavailable(String),
#[error("failed syncing metadata: {0}")]
Metadata(#[from] SyncError),
}

#[derive(Debug, thiserror::Error)]
Expand Down
16 changes: 12 additions & 4 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use restate_bifrost::BifrostService;
use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError};
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager};
use restate_core::{
spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion,
};
use restate_core::{task_center, TaskKind};
use restate_metadata_store::local::LocalMetadataStoreService;
use restate_metadata_store::MetadataStoreClient;
Expand Down Expand Up @@ -296,8 +298,12 @@ impl Node {
metadata_writer.update(logs).await?;
} else {
// otherwise, just sync the required metadata
metadata.sync(MetadataKind::PartitionTable).await?;
metadata.sync(MetadataKind::Logs).await?;
metadata
.sync(MetadataKind::PartitionTable, TargetVersion::Latest)
.await?;
metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await?;

// safety check until we can tolerate missing partition table and logs configuration
if metadata.partition_table_version() == Version::INVALID
Expand All @@ -312,7 +318,9 @@ impl Node {
}

// fetch the latest schema information
metadata.sync(MetadataKind::Schema).await?;
metadata
.sync(MetadataKind::Schema, TargetVersion::Latest)
.await?;

let nodes_config = metadata.nodes_config_ref();

Expand Down
1 change: 1 addition & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum TargetName {
ATTACH_RESPONSE = 6;
GET_PROCESSORS_STATE_REQUEST = 7;
PROCESSORS_STATE_RESPONSE = 8;
CONTROL_PROCESSORS = 9;
}

enum NodeStatus {
Expand Down
27 changes: 26 additions & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use serde_with::serde_as;

use crate::cluster::cluster_state::PartitionProcessorStatus;
use crate::identifiers::PartitionId;
use crate::net::{RequestId, TargetName};
use crate::net::{define_message, RequestId, TargetName};

use crate::net::define_rpc;
use crate::Version;

define_rpc! {
@request = GetProcessorsState,
Expand All @@ -38,3 +39,27 @@ pub struct ProcessorsStateResponse {
#[serde_as(as = "serde_with::Seq<(_, _)>")]
pub state: BTreeMap<PartitionId, PartitionProcessorStatus>,
}

define_message! {
@message = ControlProcessors,
@target = TargetName::ControlProcessors,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlProcessors {
pub min_partition_table_version: Version,
pub commands: Vec<ControlProcessor>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ControlProcessor {
pub partition_id: PartitionId,
pub command: ProcessorCommand,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ProcessorCommand {
Stop,
Follower,
Leader,
}
Loading
Loading