From 582c601642e572e622b19fb0e3ad4242ac7784e8 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 4 Mar 2024 10:20:02 +0000 Subject: [PATCH] [3/n] Stub networking and network sender This also introduces metadata manager's machinery for sending metadata updates and accepting metadata fetch requests across nodes --- Cargo.lock | 4 + crates/core/Cargo.toml | 1 + crates/core/src/metadata/manager.rs | 175 ++++++++++++++++-- crates/core/src/metadata/mod.rs | 10 +- crates/core/src/network/message_router.rs | 206 ++++++++++++++++++++++ crates/core/src/network/mod.rs | 2 + crates/core/src/task_center.rs | 27 ++- crates/core/src/task_center_types.rs | 2 + crates/network/Cargo.toml | 3 + crates/network/src/lib.rs | 3 + crates/network/src/v2/error.rs | 81 +++++++++ crates/network/src/v2/mod.rs | 14 ++ crates/network/src/v2/networking.rs | 30 ++++ crates/node/src/lib.rs | 13 +- crates/node/src/roles/admin.rs | 31 ++-- crates/node/src/roles/worker.rs | 19 +- crates/worker/src/lib.rs | 19 +- crates/worker/src/partition/mod.rs | 3 +- 18 files changed, 582 insertions(+), 61 deletions(-) create mode 100644 crates/core/src/network/message_router.rs create mode 100644 crates/network/src/v2/error.rs create mode 100644 crates/network/src/v2/mod.rs create mode 100644 crates/network/src/v2/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 77d0d8850..eb94229a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4973,6 +4973,7 @@ dependencies = [ "test-log", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-subscriber", @@ -5273,6 +5274,7 @@ name = "restate-network" version = "0.9.0" dependencies = [ "anyhow", + "async-trait", "bytes", "drain", "futures", @@ -5280,9 +5282,11 @@ dependencies = [ "pin-project", "restate-core", "restate-errors", + "restate-node-protocol", "restate-test-util", "restate-types", "restate-wal-protocol", + "static_assertions", "test-log", "thiserror", "tokio", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 1494a868f..d65af80e8 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -25,6 +25,7 @@ strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index b264d1fd3..f5202d96b 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,20 +8,26 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::Deref; use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::oneshot; -use tracing::info; +use tracing::{debug, info, warn}; -use crate::cancellation_watcher; +use restate_node_protocol::metadata::{MetadataMessage, MetadataUpdate}; +use restate_node_protocol::MessageEnvelope; use restate_types::nodes_config::NodesConfiguration; +use restate_types::GenerationalNodeId; +use restate_types::Version; + +use crate::cancellation_watcher; +use crate::is_cancellation_requested; +use crate::metadata; +use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender}; +use crate::task_center; -use super::Metadata; -use super::MetadataContainer; -use super::MetadataInner; -use super::MetadataKind; -use super::MetadataWriter; +use super::{Metadata, MetadataContainer, MetadataInner, MetadataKind, MetadataWriter}; pub(super) type CommandSender = mpsc::UnboundedSender; pub(super) type CommandReceiver = mpsc::UnboundedReceiver; @@ -30,6 +36,106 @@ pub(super) enum Command { UpdateMetadata(MetadataContainer, Option>), } +/// A handler for processing network messages targeting metadata manager +/// (dev.restate.common.TargetName = METADATA_MANAGER) +struct MetadataMessageHandler +where + N: NetworkSender + 'static + Clone, +{ + sender: CommandSender, + networking: N, +} + +impl MetadataMessageHandler +where + N: NetworkSender + 'static + Clone, +{ + fn send_metadata( + &self, + peer: GenerationalNodeId, + metadata_kind: MetadataKind, + min_version: Option, + ) { + match metadata_kind { + MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version), + _ => { + todo!("Can't send metadata '{}' to peer", metadata_kind) + } + }; + } + + fn send_nodes_config(&self, to: GenerationalNodeId, version: Option) { + let config = metadata().nodes_config(); + if version.is_some_and(|min_version| min_version > config.version()) { + // We don't have the version that the peer is asking for. Just ignore. + info!( + "Peer requested nodes config version {} but we have {}, ignoring their request", + version.unwrap(), + config.version() + ); + return; + } + info!( + "Sending nodes config {} to peer, requested version? {:?}", + config.version(), + version, + ); + let _ = task_center().spawn_child( + crate::TaskKind::Disposable, + "send-metadata-to-peer", + None, + { + let networking = self.networking.clone(); + async move { + networking + .send( + to.into(), + &MetadataMessage::MetadataUpdate(MetadataUpdate { + container: MetadataContainer::NodesConfiguration( + config.deref().clone(), + ), + }), + ) + .await?; + Ok(()) + } + }, + ); + } +} + +impl MessageHandler for MetadataMessageHandler +where + N: NetworkSender + 'static + Clone, +{ + type MessageType = MetadataMessage; + + async fn on_message(&self, envelope: MessageEnvelope) { + let (peer, msg) = envelope.split(); + match msg { + MetadataMessage::MetadataUpdate(update) => { + info!( + "Received '{}' metadata update from peer {}", + update.container.kind(), + peer + ); + if let Err(e) = self + .sender + .send(Command::UpdateMetadata(update.container, None)) + { + if !is_cancellation_requested() { + warn!("Failed to send metadata message to metadata manager: {}", e); + } + } + } + MetadataMessage::GetMetadataRequest(request) => { + debug!("Received GetMetadataRequest from peer {}", peer); + self.send_metadata(peer, request.metadata_kind, request.min_version); + } + }; + } +} + /// Handle to access locally cached metadata, request metadata updates, and more. /// What is metadata manager? /// @@ -49,23 +155,34 @@ pub(super) enum Command { /// - Schema metadata /// - NodesConfiguration /// - Partition table -pub struct MetadataManager { +pub struct MetadataManager { self_sender: CommandSender, inner: Arc, inbound: CommandReceiver, + networking: N, } -impl MetadataManager { - pub fn build() -> Self { - let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel(); - +impl MetadataManager +where + N: NetworkSender + 'static + Clone, +{ + pub fn build(networking: N) -> Self { + let (self_sender, inbound) = mpsc::unbounded_channel(); Self { inner: Arc::new(MetadataInner::default()), inbound, self_sender, + networking, } } + pub fn register_in_message_router(&self, sr_builder: &mut MessageRouterBuilder) { + sr_builder.add_message_handler(MetadataMessageHandler { + sender: self.self_sender.clone(), + networking: self.networking.clone(), + }); + } + pub fn metadata(&self) -> Metadata { Metadata::new(self.inner.clone(), self.self_sender.clone()) } @@ -124,6 +241,11 @@ impl MetadataManager { } Some(current) => { /* Do nothing, current is already newer */ + debug!( + "Ignoring nodes config update {} because we are at {}", + config.version(), + current.version(), + ); maybe_new_version = current.version(); } } @@ -156,15 +278,37 @@ mod tests { use googletest::prelude::*; use restate_test_util::assert_eq; use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role}; - use restate_types::{GenerationalNodeId, Version}; + use restate_types::{GenerationalNodeId, NodeId, Version}; use crate::metadata::spawn_metadata_manager; + use crate::network::NetworkSendError; use crate::TaskCenterFactory; + // TEMPORARY. REMOVED IN NEXT PR(s) + #[derive(Clone)] + struct MockNetworkSender; + + impl NetworkSender for MockNetworkSender { + async fn send( + &self, + _to: NodeId, + _message: &M, + ) -> std::result::Result<(), NetworkSendError> + where + M: restate_node_protocol::codec::WireSerde + + restate_node_protocol::codec::Targeted + + Send + + Sync, + { + Ok(()) + } + } + #[tokio::test] async fn test_nodes_config_updates() -> Result<()> { + let network_sender = MockNetworkSender; let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let metadata_manager = MetadataManager::build(network_sender); let metadata_writer = metadata_manager.writer(); let metadata = metadata_manager.metadata(); @@ -219,8 +363,9 @@ mod tests { #[tokio::test] async fn test_watchers() -> Result<()> { + let network_sender = MockNetworkSender; let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let metadata_manager = MetadataManager::build(network_sender); let metadata_writer = metadata_manager.writer(); let metadata = metadata_manager.metadata(); diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index ac9e768c2..692c7c274 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -24,6 +24,7 @@ use restate_node_protocol::metadata::{MetadataContainer, MetadataKind}; use restate_types::nodes_config::NodesConfiguration; use restate_types::{GenerationalNodeId, Version}; +use crate::network::NetworkSender; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; /// The kind of versioned metadata that can be synchronized across nodes. @@ -148,10 +149,13 @@ impl Default for VersionWatch { } } -pub fn spawn_metadata_manager( +pub fn spawn_metadata_manager( tc: &TaskCenter, - metadata_manager: MetadataManager, -) -> Result { + metadata_manager: MetadataManager, +) -> Result +where + N: NetworkSender + 'static, +{ tc.spawn( TaskKind::MetadataBackgroundSync, "metadata-manager", diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs new file mode 100644 index 000000000..038433c8d --- /dev/null +++ b/crates/core/src/network/message_router.rs @@ -0,0 +1,206 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::stream::BoxStream; +use restate_node_protocol::codec::Targeted; +use restate_node_protocol::codec::WireSerde; +use restate_node_protocol::common::ProtocolVersion; +use restate_node_protocol::common::TargetName; +use restate_node_protocol::node::message::BinaryMessage; +use restate_node_protocol::CodecError; +use restate_node_protocol::MessageEnvelope; +use restate_types::GenerationalNodeId; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::warn; + +use crate::is_cancellation_requested; + +use super::RouterError; + +/// Implement this trait to process network messages for a specific target +/// (e.g. TargetName = METADATA_MANAGER). +pub trait MessageHandler { + type MessageType: WireSerde + Targeted; + /// Process the request and return the response asynchronously. + fn on_message( + &self, + msg: MessageEnvelope, + ) -> impl std::future::Future + Send; +} + +/// A low-level handler trait. +#[async_trait] +pub trait Handler: Send + Sync { + type Error; + /// Deserialize and process the message asynchronously. + async fn call( + &self, + from: GenerationalNodeId, + // A local identifier that in conjunction with the from value + // uniquely identify the stream. + connection_id: u64, + protocol_version: ProtocolVersion, + message: BinaryMessage, + ) -> Result<(), Self::Error>; +} + +#[derive(Clone, Default)] +pub struct MessageRouter(Arc); + +#[derive(Default)] +struct MessageRouterInner { + handlers: HashMap + Send + Sync>>, +} + +#[async_trait] +impl Handler for MessageRouter { + type Error = RouterError; + /// Process the request and return the response asynchronously. + async fn call( + &self, + from: GenerationalNodeId, + connection_id: u64, + protocol_version: ProtocolVersion, + message: BinaryMessage, + ) -> Result<(), Self::Error> { + let target = message.target(); + let Some(handler) = self.0.handlers.get(&target) else { + return Err(RouterError::NotRegisteredTarget(target.to_string())); + }; + handler + .call(from, connection_id, protocol_version, message) + .await?; + Ok(()) + } +} + +#[derive(Default)] +pub struct MessageRouterBuilder { + handlers: HashMap + Send + Sync>>, +} + +impl MessageRouterBuilder { + /// Attach a handler that implements [`MessageHandler`] to receive messages + /// for the associated target. + #[track_caller] + pub fn add_message_handler(&mut self, handler: H) + where + H: MessageHandler + Send + Sync + 'static, + { + let wrapped = MessageHandlerWrapper { inner: handler }; + let target = H::MessageType::TARGET; + if self.handlers.insert(target, Box::new(wrapped)).is_some() { + panic!("Handler for target {} has been registered already!", target); + } + } + + /// Subscribe to a stream of messages for a specific target. This enables consumers of messages + /// to use async stream API to process messages of a given target as an alternative to the + /// message callback-style API as in `add_message_handler`. + #[track_caller] + pub fn subscribe_to_stream( + &mut self, + buffer_size: usize, + ) -> BoxStream<'static, MessageEnvelope> + where + M: WireSerde + Targeted + Send + Sync + 'static, + { + let (tx, rx) = mpsc::channel(buffer_size); + + let wrapped = StreamHandlerWrapper { sender: tx }; + let target = M::TARGET; + if self.handlers.insert(target, Box::new(wrapped)).is_some() { + panic!("Handler for target {} has been registered already!", target); + } + Box::pin(ReceiverStream::new(rx)) + } + + /// Finalize this builder and return the message router that can be attached to + /// [`crate::ConnectionManager`] + pub fn build(self) -> MessageRouter { + MessageRouter(Arc::new(MessageRouterInner { + handlers: self.handlers, + })) + } +} + +struct MessageHandlerWrapper { + inner: H, +} + +#[async_trait] +impl Handler for MessageHandlerWrapper +where + H: MessageHandler + Send + Sync + 'static, +{ + type Error = CodecError; + /// Process the request and return the response asynchronously. + async fn call( + &self, + from: GenerationalNodeId, + connection_id: u64, + protocol_version: ProtocolVersion, + message: BinaryMessage, + ) -> Result<(), Self::Error> { + let msg = ::decode(message.payload, protocol_version)?; + self.inner + .on_message(MessageEnvelope::new(from, connection_id, msg)) + .await; + Ok(()) + } +} + +struct StreamHandlerWrapper +where + M: WireSerde + Targeted + Send + Sync + 'static, +{ + sender: mpsc::Sender>, +} + +#[async_trait] +impl Handler for StreamHandlerWrapper +where + M: WireSerde + Targeted + Send + Sync + 'static, +{ + type Error = CodecError; + /// Process the request and return the response asynchronously. + async fn call( + &self, + from: GenerationalNodeId, + connection_id: u64, + protocol_version: ProtocolVersion, + message: BinaryMessage, + ) -> Result<(), Self::Error> { + let msg = ::decode(message.payload, protocol_version)?; + if let Err(e) = self + .sender + .send(MessageEnvelope::new(from, connection_id, msg)) + .await + { + // Can be benign if we are shutting down + if !is_cancellation_requested() { + warn!( + "Failed to send message for target {} to stream: {}", + M::TARGET, + e + ); + } + } + Ok(()) + } +} + +static_assertions::assert_impl_all!(MessageRouter: Send, Sync); diff --git a/crates/core/src/network/mod.rs b/crates/core/src/network/mod.rs index 036ff3aea..d993f8a85 100644 --- a/crates/core/src/network/mod.rs +++ b/crates/core/src/network/mod.rs @@ -9,7 +9,9 @@ // by the Apache License, Version 2.0. mod error; +mod message_router; mod network_sender; pub use error::*; +pub use message_router::*; pub use network_sender::*; diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 4cd024fb5..74978c1c3 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -8,21 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use futures::FutureExt; -use restate_types::identifiers::PartitionId; use std::collections::HashMap; use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, Instant}; -use futures::Future; +use futures::{Future, FutureExt}; +use restate_types::NodeId; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; use tracing::{debug, error, info, instrument, trace, warn}; +use restate_types::identifiers::PartitionId; + use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager}; +use crate::network::{NetworkSendError, NetworkSender}; use crate::{TaskId, TaskKind}; static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0); @@ -32,6 +34,22 @@ const EXIT_CODE_FAILURE: i32 = 1; #[error("system is shutting down")] pub struct ShutdownError; +// TEMPORARY. REMOVED IN NEXT PR(s) +#[derive(Clone)] +struct MockNetworkSender; + +impl NetworkSender for MockNetworkSender { + async fn send(&self, _to: NodeId, _message: &M) -> Result<(), NetworkSendError> + where + M: restate_node_protocol::codec::WireSerde + + restate_node_protocol::codec::Targeted + + Send + + Sync, + { + Ok(()) + } +} + /// Used to create a new task center. In practice, there should be a single task center for the /// entire process but we might need to create more than one in integration test scenarios. pub struct TaskCenterFactory {} @@ -56,7 +74,8 @@ pub fn create_test_task_center() -> TaskCenter { let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let networking = MockNetworkSender; + let metadata_manager = MetadataManager::build(networking); let metadata = metadata_manager.metadata(); metadata_manager .writer() diff --git a/crates/core/src/task_center_types.rs b/crates/core/src/task_center_types.rs index 0f7a72337..36229cd47 100644 --- a/crates/core/src/task_center_types.rs +++ b/crates/core/src/task_center_types.rs @@ -70,6 +70,8 @@ pub enum TaskKind { /// A background task that the system needs for its operation. The task requires a system /// shutdown on errors and the system will wait for its graceful cancellation on shutdown. BifrostBackgroundHighPriority, + #[strum(props(OnCancel = "abort", OnError = "log"))] + Disposable, } impl TaskKind { diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index f8fc1fe01..9f818a0f3 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -10,15 +10,18 @@ publish = false [dependencies] restate-core = { workspace = true } restate-errors = { workspace = true } +restate-node-protocol = { workspace = true } restate-types = { workspace = true } restate-wal-protocol = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true } drain = { workspace = true } futures = { workspace = true } http = { workspace = true } pin-project = { workspace = true } +static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index 619267b24..e4acac6b8 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -14,6 +14,9 @@ use std::fmt::Debug; use std::future::Future; use tokio::sync::mpsc; +mod v2; +pub use v2::*; + mod routing; mod unbounded_handle; pub mod utils; diff --git a/crates/network/src/v2/error.rs b/crates/network/src/v2/error.rs new file mode 100644 index 000000000..0134e566b --- /dev/null +++ b/crates/network/src/v2/error.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_core::ShutdownError; +use restate_node_protocol::common::MIN_SUPPORTED_PROTOCOL_VERSION; +use restate_types::nodes_config::NodesConfigError; +use restate_types::NodeId; + +#[derive(Debug, thiserror::Error)] +pub enum NetworkError { + #[error("unknown node: {0}")] + UnknownNode(#[from] NodesConfigError), + #[error("operation aborted, node is shutting down")] + Shutdown(#[from] ShutdownError), + #[error("node {0} address is bad: {1}")] + BadNodeAddress(NodeId, http::Error), + #[error("timeout: {0}")] + Timeout(&'static str), + #[error("protocol error: {0}")] + ProtocolError(#[from] ProtocolError), + #[error("cannot connect: {} {}", tonic::Status::code(.0), tonic::Status::message(.0))] + ConnectError(#[from] tonic::Status), + #[error("new node generation exists: {0}")] + OldPeerGeneration(String), + #[error("peer is not connected")] + ConnectionClosed, +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + #[error("handshake failed: {0}")] + HandshakeFailed(&'static str), + #[error("handshake timeout: {0}")] + HandshakeTimeout(&'static str), + #[error("peer dropped connection")] + PeerDropped, + #[error("error in codec: {0}")] + Codec(String), + #[error("grpc error: {0}")] + GrpcError(#[from] tonic::Status), + #[error( + "peer has unsupported protocol version {0}, minimum supported is '{}'", + MIN_SUPPORTED_PROTOCOL_VERSION as i32 + )] + UnsupportedVersion(i32), +} + +impl From for tonic::Status { + fn from(value: ProtocolError) -> Self { + match value { + ProtocolError::HandshakeFailed(e) => tonic::Status::invalid_argument(e), + ProtocolError::HandshakeTimeout(e) => tonic::Status::deadline_exceeded(e), + ProtocolError::PeerDropped => tonic::Status::cancelled("peer dropped"), + ProtocolError::Codec(e) => tonic::Status::internal(e), + ProtocolError::UnsupportedVersion(_) => { + tonic::Status::invalid_argument(value.to_string()) + } + ProtocolError::GrpcError(s) => s, + } + } +} + +impl From for tonic::Status { + fn from(value: NetworkError) -> Self { + match value { + NetworkError::Shutdown(_) => tonic::Status::unavailable(value.to_string()), + NetworkError::ProtocolError(e) => e.into(), + NetworkError::Timeout(e) => tonic::Status::deadline_exceeded(e), + NetworkError::OldPeerGeneration(e) => tonic::Status::already_exists(e), + NetworkError::ConnectError(s) => s, + e => tonic::Status::internal(e.to_string()), + } + } +} diff --git a/crates/network/src/v2/mod.rs b/crates/network/src/v2/mod.rs new file mode 100644 index 000000000..b0d5cc0f9 --- /dev/null +++ b/crates/network/src/v2/mod.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod error; +mod networking; + +pub use networking::Networking; diff --git a/crates/network/src/v2/networking.rs b/crates/network/src/v2/networking.rs new file mode 100644 index 000000000..eb1f7c538 --- /dev/null +++ b/crates/network/src/v2/networking.rs @@ -0,0 +1,30 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_core::network::{NetworkSendError, NetworkSender}; +use restate_node_protocol::codec::{Targeted, WireSerde}; +use restate_types::NodeId; + +/// Access to node-to-node networking infrastructure; +#[derive(Default, Clone)] +pub struct Networking {} + +impl Networking {} + +impl NetworkSender for Networking { + async fn send(&self, _to: NodeId, _message: &M) -> Result<(), NetworkSendError> + where + M: WireSerde + Targeted + Send + Sync, + { + Ok(()) + } +} + +static_assertions::assert_impl_all!(Networking: Send, Sync); diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2a43989a5..6a6560faa 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -15,7 +15,9 @@ mod roles; pub use options::{Options, OptionsBuilder as NodeOptionsBuilder}; pub use restate_admin::OptionsBuilder as AdminOptionsBuilder; use restate_bifrost::BifrostService; +use restate_core::network::MessageRouterBuilder; pub use restate_meta::OptionsBuilder as MetaOptionsBuilder; +use restate_network::Networking; pub use restate_worker::{OptionsBuilder as WorkerOptionsBuilder, RocksdbOptionsBuilder}; use std::ops::Deref; @@ -63,7 +65,7 @@ pub enum BuildError { pub struct Node { options: Options, - metadata_manager: MetadataManager, + metadata_manager: MetadataManager, bifrost: BifrostService, admin_role: Option, worker_role: Option, @@ -83,16 +85,19 @@ impl Node { } } - let metadata_manager = MetadataManager::build(); + let mut sr_builder = MessageRouterBuilder::default(); + let networking = Networking::default(); + let metadata_manager = MetadataManager::build(networking.clone()); + metadata_manager.register_in_message_router(&mut sr_builder); let admin_role = if options.roles.contains(Role::Admin) { - Some(AdminRole::try_from(options.clone())?) + Some(AdminRole::new(options.clone(), networking.clone())?) } else { None }; let worker_role = if options.roles.contains(Role::Worker) { - Some(WorkerRole::try_from(options.clone())?) + Some(WorkerRole::new(options.clone(), networking.clone())?) } else { None }; diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 9e6131393..381633716 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -10,6 +10,7 @@ use anyhow::Context; use codederror::CodedError; +use restate_network::Networking; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use tonic::transport::Channel; use tracing::info; @@ -44,6 +45,19 @@ pub struct AdminRole { } impl AdminRole { + pub fn new(options: Options, _networking: Networking) -> Result { + let meta = options.meta.build(options.worker.kafka.clone())?; + let admin = options + .admin + .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); + + Ok(AdminRole { + controller: restate_cluster_controller::Service::new(options.cluster_controller), + admin, + meta, + }) + } + pub fn cluster_controller_handle(&self) -> ClusterControllerHandle { self.controller.handle() } @@ -93,23 +107,6 @@ impl AdminRole { } } -impl TryFrom for AdminRole { - type Error = AdminRoleBuildError; - - fn try_from(options: Options) -> Result { - let meta = options.meta.build(options.worker.kafka.clone())?; - let admin = options - .admin - .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); - - Ok(AdminRole { - controller: restate_cluster_controller::Service::new(options.cluster_controller), - admin, - meta, - }) - } -} - #[derive(Debug, Clone)] struct GrpcNodeHandle { grpc_client: NodeSvcClient, diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index dc5c4c67a..e379000f5 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -11,6 +11,7 @@ use std::time::Duration; use codederror::CodedError; +use restate_network::Networking; use tonic::transport::Channel; use tracing::subscriber::NoSubscriber; use tracing::trace; @@ -96,6 +97,13 @@ pub struct WorkerRole { } impl WorkerRole { + pub fn new(options: Options, networking: Networking) -> Result { + let schemas = Schemas::default(); + let worker = options.worker.build(networking, schemas.clone())?; + + Ok(WorkerRole { schemas, worker }) + } + pub fn rocksdb_storage(&self) -> &RocksDBStorage { self.worker.rocksdb_storage() } @@ -256,17 +264,6 @@ impl WorkerRole { } } -impl TryFrom for WorkerRole { - type Error = WorkerRoleBuildError; - - fn try_from(options: Options) -> Result { - let schemas = Schemas::default(); - let worker = options.worker.build(schemas.clone())?; - - Ok(WorkerRole { schemas, worker }) - } -} - pub async fn update_schemas( schemas: &Schemas, subscription_controller: Option<&SC>, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 823875460..b41413593 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -26,7 +26,7 @@ use restate_ingress_kafka::Service as IngressKafkaService; use restate_invoker_impl::{ ChannelServiceHandle as InvokerChannelServiceHandle, Service as InvokerService, }; -use restate_network::{PartitionProcessorSender, UnboundedNetworkHandle}; +use restate_network::{Networking, PartitionProcessorSender, UnboundedNetworkHandle}; use restate_schema_impl::Schemas; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_query_datafusion::context::QueryContext; @@ -163,9 +163,9 @@ impl Options { &self.storage_rocksdb.path } - pub fn build(self, schemas: Schemas) -> Result { + pub fn build(self, networking: Networking, schemas: Schemas) -> Result { metric_definitions::describe_metrics(); - Worker::new(self, schemas) + Worker::new(self, networking, schemas) } } @@ -194,6 +194,7 @@ impl Error { pub struct Worker { consensus: Consensus, processors: Vec, + networking: Networking, network: network_integration::Network, storage_query_context: QueryContext, storage_query_postgres: PostgresQueryService, @@ -214,7 +215,11 @@ pub struct Worker { } impl Worker { - pub fn new(opts: Options, schemas: Schemas) -> Result { + pub fn new( + opts: Options, + networking: Networking, + schemas: Schemas, + ) -> Result { let Options { channel_size, ingress_grpc, @@ -315,6 +320,7 @@ impl Worker { Ok(Self { consensus, processors, + networking, network, storage_query_context, storage_query_postgres, @@ -418,7 +424,7 @@ impl Worker { // Networking tc.spawn_child( TaskKind::SystemService, - "networking", + "networking-legacy", None, self.network.run(), )?; @@ -449,11 +455,12 @@ impl Worker { // Create partition processors for processor in self.processors { + let networking = self.networking.clone(); tc.spawn_child( TaskKind::PartitionProcessor, "partition-processor", Some(processor.partition_id), - processor.run(), + processor.run(networking), )?; } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index feb9adb7c..816fea62a 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -18,6 +18,7 @@ use crate::partition::storage::{PartitionStorage, Transaction}; use crate::util::IdentitySender; use futures::StreamExt; use metrics::counter; +use restate_network::Networking; use restate_schema_impl::Schemas; use restate_storage_rocksdb::RocksDBStorage; use restate_types::identifiers::{PartitionId, PartitionKey, PeerId}; @@ -119,7 +120,7 @@ where } #[instrument(level = "trace", skip_all, fields(peer_id = %self.peer_id, partition_id = %self.partition_id))] - pub(super) async fn run(self) -> anyhow::Result<()> { + pub(super) async fn run(self, _networking: Networking) -> anyhow::Result<()> { let PartitionProcessor { peer_id, partition_id,