From c18075cbe09f5778e2e7fa94a5162625e50e67f9 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 28 Feb 2024 17:45:40 +0000 Subject: [PATCH] [3/n] Stub networking and network sender --- Cargo.lock | 3 + crates/core/src/metadata/manager.rs | 129 +++++++++++++++++++++++++-- crates/core/src/task_center.rs | 28 ++++-- crates/core/src/task_center_types.rs | 2 + crates/ingress-http/Cargo.toml | 2 +- crates/network/Cargo.toml | 3 + crates/network/src/lib.rs | 3 + crates/network/src/v2/error.rs | 81 +++++++++++++++++ crates/network/src/v2/mod.rs | 14 +++ crates/network/src/v2/networking.rs | 38 ++++++++ crates/node/src/lib.rs | 11 ++- crates/node/src/roles/admin.rs | 36 ++++---- crates/node/src/roles/worker.rs | 23 ++--- crates/worker/src/lib.rs | 24 +++-- crates/worker/src/partition/mod.rs | 4 +- 15 files changed, 351 insertions(+), 50 deletions(-) create mode 100644 crates/network/src/v2/error.rs create mode 100644 crates/network/src/v2/mod.rs create mode 100644 crates/network/src/v2/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 5655bfc62..af8418a13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5245,6 +5245,7 @@ name = "restate-network" version = "0.8.0" dependencies = [ "anyhow", + "async-trait", "bytes", "drain", "futures", @@ -5252,9 +5253,11 @@ dependencies = [ "pin-project", "restate-core", "restate-errors", + "restate-node-protocol", "restate-test-util", "restate-types", "restate-wal-protocol", + "static_assertions", "test-log", "thiserror", "tokio", diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index b264d1fd3..91ff876bc 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,13 +8,22 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::Deref; use std::sync::Arc; +use restate_node_protocol::MessageEnvelope; +use restate_node_protocol::MetadataUpdate; +use restate_node_protocol::NetworkMessage; +use restate_types::GenerationalNodeId; +use restate_types::Version; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tracing::debug; use tracing::info; use crate::cancellation_watcher; +use crate::network_sender::NetworkSender; +use crate::task_center; use restate_types::nodes_config::NodesConfiguration; use super::Metadata; @@ -28,6 +37,7 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver; pub(super) enum Command { UpdateMetadata(MetadataContainer, Option>), + SendMetadataToPeer(GenerationalNodeId, MetadataKind, Option), } /// Handle to access locally cached metadata, request metadata updates, and more. @@ -53,16 +63,25 @@ pub struct MetadataManager { self_sender: CommandSender, inner: Arc, inbound: CommandReceiver, + networking: Arc, + // Handle inbound network messages to update our metadata and to respond to + // external metadata fetch requests + network_inbound: mpsc::Receiver, + network_inbound_sender: mpsc::Sender, } impl MetadataManager { - pub fn build() -> Self { - let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel(); + pub fn build(networking: Arc) -> Self { + let (self_sender, inbound) = mpsc::unbounded_channel(); + let (network_inbound_sender, network_inbound) = mpsc::channel(1); Self { inner: Arc::new(MetadataInner::default()), inbound, + networking, self_sender, + network_inbound, + network_inbound_sender, } } @@ -74,6 +93,10 @@ impl MetadataManager { MetadataWriter::new(self.self_sender.clone(), self.inner.clone()) } + pub fn network_inbound_sender(&self) -> mpsc::Sender { + self.network_inbound_sender.clone() + } + /// Start and wait for shutdown signal. pub async fn run(mut self) -> anyhow::Result<()> { info!("Metadata manager started"); @@ -88,6 +111,9 @@ impl MetadataManager { Some(cmd) = self.inbound.recv() => { self.handle_command(cmd) } + Some(envelope) = self.network_inbound.recv() => { + self.handle_network_message(envelope).await + } } } Ok(()) @@ -96,9 +122,23 @@ impl MetadataManager { fn handle_command(&mut self, cmd: Command) { match cmd { Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback), + Command::SendMetadataToPeer(peer, kind, min_version) => { + self.send_metadata(peer, kind, min_version) + } } } + async fn handle_network_message(&mut self, envelope: MessageEnvelope) { + let (peer, msg) = envelope.split(); + match msg { + NetworkMessage::MetadataUpdate(update) => self.update_metadata(update.container, None), + NetworkMessage::GetMetadataRequest(request) => { + debug!("Received GetMetadataRequest from peer {}", peer); + self.send_metadata(peer, request.metadata_kind, request.min_version); + } + }; + } + fn update_metadata(&mut self, value: MetadataContainer, callback: Option>) { match value { MetadataContainer::NodesConfiguration(config) => { @@ -107,6 +147,62 @@ impl MetadataManager { } } + fn send_metadata( + &mut self, + peer: GenerationalNodeId, + metadata_kind: MetadataKind, + min_version: Option, + ) { + match metadata_kind { + MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version), + _ => { + todo!("Can't send metadata '{}' to peer", metadata_kind) + } + }; + } + + fn send_nodes_config(&self, to: GenerationalNodeId, version: Option) { + let config = self.inner.nodes_config.load_full(); + let Some(config) = config else { + return; + }; + if version.is_some_and(|min_version| min_version > config.version()) { + // We don't have the version that the peer is asking for. Just ignore. + debug!( + "Peer requested nodes config version {} but we have {}, ignoring their request", + version.unwrap(), + config.version() + ); + return; + } + info!( + "Sending nodes config {} to peer, requested version? {:?}", + config.version(), + version, + ); + let _ = task_center().spawn_child( + crate::TaskKind::Disposable, + "send-metadata-to-peer", + None, + { + let networking = self.networking.clone(); + async move { + networking + .send( + to.into(), + &NetworkMessage::MetadataUpdate(MetadataUpdate { + container: MetadataContainer::NodesConfiguration( + config.deref().clone(), + ), + }), + ) + .await?; + Ok(()) + } + }, + ); + } + fn update_nodes_configuration( &mut self, config: NodesConfiguration, @@ -124,6 +220,11 @@ impl MetadataManager { } Some(current) => { /* Do nothing, current is already newer */ + debug!( + "Ignoring nodes config update {} because we are at {}", + config.version(), + current.version(), + ); maybe_new_version = current.version(); } } @@ -153,18 +254,33 @@ mod tests { use super::*; + use async_trait::async_trait; use googletest::prelude::*; use restate_test_util::assert_eq; use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role}; - use restate_types::{GenerationalNodeId, Version}; + use restate_types::{GenerationalNodeId, NodeId, Version}; use crate::metadata::spawn_metadata_manager; - use crate::TaskCenterFactory; + use crate::{NetworkSendError, TaskCenterFactory}; + + struct MockNetworkSender; + + #[async_trait] + impl NetworkSender for MockNetworkSender { + async fn send( + &self, + _to: NodeId, + _message: &NetworkMessage, + ) -> std::result::Result<(), NetworkSendError> { + Ok(()) + } + } #[tokio::test] async fn test_nodes_config_updates() -> Result<()> { + let network_sender = Arc::new(MockNetworkSender); let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let metadata_manager = MetadataManager::build(network_sender); let metadata_writer = metadata_manager.writer(); let metadata = metadata_manager.metadata(); @@ -219,8 +335,9 @@ mod tests { #[tokio::test] async fn test_watchers() -> Result<()> { + let network_sender = Arc::new(MockNetworkSender); let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let metadata_manager = MetadataManager::build(network_sender); let metadata_writer = metadata_manager.writer(); let metadata = metadata_manager.metadata(); diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 4cd024fb5..5922b8643 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -8,22 +8,25 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use futures::FutureExt; -use restate_types::identifiers::PartitionId; use std::collections::HashMap; use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, Instant}; -use futures::Future; +use async_trait::async_trait; +use futures::{Future, FutureExt}; +use restate_types::NodeId; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; use tracing::{debug, error, info, instrument, trace, warn}; +use restate_node_protocol::NetworkMessage; +use restate_types::identifiers::PartitionId; + use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager}; -use crate::{TaskId, TaskKind}; +use crate::{NetworkSendError, NetworkSender, TaskId, TaskKind}; static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0); const EXIT_CODE_FAILURE: i32 = 1; @@ -32,6 +35,20 @@ const EXIT_CODE_FAILURE: i32 = 1; #[error("system is shutting down")] pub struct ShutdownError; +// TEMPORARY. REMOVED IN NEXT PR(s) +struct MockNetworkSender; + +#[async_trait] +impl NetworkSender for MockNetworkSender { + async fn send( + &self, + _to: NodeId, + _message: &NetworkMessage, + ) -> std::result::Result<(), NetworkSendError> { + Ok(()) + } +} + /// Used to create a new task center. In practice, there should be a single task center for the /// entire process but we might need to create more than one in integration test scenarios. pub struct TaskCenterFactory {} @@ -56,7 +73,8 @@ pub fn create_test_task_center() -> TaskCenter { let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); - let metadata_manager = MetadataManager::build(); + let networking = Arc::new(MockNetworkSender); + let metadata_manager = MetadataManager::build(networking); let metadata = metadata_manager.metadata(); metadata_manager .writer() diff --git a/crates/core/src/task_center_types.rs b/crates/core/src/task_center_types.rs index 0f7a72337..36229cd47 100644 --- a/crates/core/src/task_center_types.rs +++ b/crates/core/src/task_center_types.rs @@ -70,6 +70,8 @@ pub enum TaskKind { /// A background task that the system needs for its operation. The task requires a system /// shutdown on errors and the system will wait for its graceful cancellation on shutdown. BifrostBackgroundHighPriority, + #[strum(props(OnCancel = "abort", OnError = "log"))] + Disposable, } impl TaskKind { diff --git a/crates/ingress-http/Cargo.toml b/crates/ingress-http/Cargo.toml index f4b930198..80e68d7ba 100644 --- a/crates/ingress-http/Cargo.toml +++ b/crates/ingress-http/Cargo.toml @@ -34,7 +34,7 @@ tokio = { workspace = true } http = "1.0" http-body = "1.0" http-body-util = "0.1" -hyper-util = { version = "0.1", features = ["http1", "http2", "server"] } +hyper-util = { version = "0.1", features = ["http1", "http2", "server", "tokio"] } # Tracing opentelemetry = { workspace = true } diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index f8fc1fe01..9f818a0f3 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -10,15 +10,18 @@ publish = false [dependencies] restate-core = { workspace = true } restate-errors = { workspace = true } +restate-node-protocol = { workspace = true } restate-types = { workspace = true } restate-wal-protocol = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true } drain = { workspace = true } futures = { workspace = true } http = { workspace = true } pin-project = { workspace = true } +static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tonic = { workspace = true } diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index 619267b24..e4acac6b8 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -14,6 +14,9 @@ use std::fmt::Debug; use std::future::Future; use tokio::sync::mpsc; +mod v2; +pub use v2::*; + mod routing; mod unbounded_handle; pub mod utils; diff --git a/crates/network/src/v2/error.rs b/crates/network/src/v2/error.rs new file mode 100644 index 000000000..0134e566b --- /dev/null +++ b/crates/network/src/v2/error.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_core::ShutdownError; +use restate_node_protocol::common::MIN_SUPPORTED_PROTOCOL_VERSION; +use restate_types::nodes_config::NodesConfigError; +use restate_types::NodeId; + +#[derive(Debug, thiserror::Error)] +pub enum NetworkError { + #[error("unknown node: {0}")] + UnknownNode(#[from] NodesConfigError), + #[error("operation aborted, node is shutting down")] + Shutdown(#[from] ShutdownError), + #[error("node {0} address is bad: {1}")] + BadNodeAddress(NodeId, http::Error), + #[error("timeout: {0}")] + Timeout(&'static str), + #[error("protocol error: {0}")] + ProtocolError(#[from] ProtocolError), + #[error("cannot connect: {} {}", tonic::Status::code(.0), tonic::Status::message(.0))] + ConnectError(#[from] tonic::Status), + #[error("new node generation exists: {0}")] + OldPeerGeneration(String), + #[error("peer is not connected")] + ConnectionClosed, +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + #[error("handshake failed: {0}")] + HandshakeFailed(&'static str), + #[error("handshake timeout: {0}")] + HandshakeTimeout(&'static str), + #[error("peer dropped connection")] + PeerDropped, + #[error("error in codec: {0}")] + Codec(String), + #[error("grpc error: {0}")] + GrpcError(#[from] tonic::Status), + #[error( + "peer has unsupported protocol version {0}, minimum supported is '{}'", + MIN_SUPPORTED_PROTOCOL_VERSION as i32 + )] + UnsupportedVersion(i32), +} + +impl From for tonic::Status { + fn from(value: ProtocolError) -> Self { + match value { + ProtocolError::HandshakeFailed(e) => tonic::Status::invalid_argument(e), + ProtocolError::HandshakeTimeout(e) => tonic::Status::deadline_exceeded(e), + ProtocolError::PeerDropped => tonic::Status::cancelled("peer dropped"), + ProtocolError::Codec(e) => tonic::Status::internal(e), + ProtocolError::UnsupportedVersion(_) => { + tonic::Status::invalid_argument(value.to_string()) + } + ProtocolError::GrpcError(s) => s, + } + } +} + +impl From for tonic::Status { + fn from(value: NetworkError) -> Self { + match value { + NetworkError::Shutdown(_) => tonic::Status::unavailable(value.to_string()), + NetworkError::ProtocolError(e) => e.into(), + NetworkError::Timeout(e) => tonic::Status::deadline_exceeded(e), + NetworkError::OldPeerGeneration(e) => tonic::Status::already_exists(e), + NetworkError::ConnectError(s) => s, + e => tonic::Status::internal(e.to_string()), + } + } +} diff --git a/crates/network/src/v2/mod.rs b/crates/network/src/v2/mod.rs new file mode 100644 index 000000000..b0d5cc0f9 --- /dev/null +++ b/crates/network/src/v2/mod.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod error; +mod networking; + +pub use networking::Networking; diff --git a/crates/network/src/v2/networking.rs b/crates/network/src/v2/networking.rs new file mode 100644 index 000000000..4665db907 --- /dev/null +++ b/crates/network/src/v2/networking.rs @@ -0,0 +1,38 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use async_trait::async_trait; +use restate_core::{NetworkSendError, NetworkSender}; +use restate_node_protocol::{MessageEnvelope, NetworkMessage}; +use restate_types::NodeId; +use tokio::sync::mpsc; + +/// Access to node-to-node networking infrastructure; +#[derive(Default)] +pub struct Networking {} + +impl Networking { + #[track_caller] + /// be called once on startup + pub fn set_metadata_manager_subscriber(&self, _subscriber: mpsc::Sender) {} + + #[track_caller] + /// be called once on startup + pub fn set_ingress_subscriber(&self, _subscriber: mpsc::Sender) {} +} + +#[async_trait] +impl NetworkSender for Networking { + async fn send(&self, _to: NodeId, _message: &NetworkMessage) -> Result<(), NetworkSendError> { + Ok(()) + } +} + +static_assertions::assert_impl_all!(Networking: Send, Sync); diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 2a43989a5..249ce571f 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -16,9 +16,11 @@ pub use options::{Options, OptionsBuilder as NodeOptionsBuilder}; pub use restate_admin::OptionsBuilder as AdminOptionsBuilder; use restate_bifrost::BifrostService; pub use restate_meta::OptionsBuilder as MetaOptionsBuilder; +use restate_network::Networking; pub use restate_worker::{OptionsBuilder as WorkerOptionsBuilder, RocksdbOptionsBuilder}; use std::ops::Deref; +use std::sync::Arc; use anyhow::bail; use codederror::CodedError; @@ -83,16 +85,19 @@ impl Node { } } - let metadata_manager = MetadataManager::build(); + let networking = Arc::new(Networking::default()); + let metadata_manager = MetadataManager::build(networking.clone()); + // Metadata manager subscribes to sync and update metadata messages. + networking.set_metadata_manager_subscriber(metadata_manager.network_inbound_sender()); let admin_role = if options.roles.contains(Role::Admin) { - Some(AdminRole::try_from(options.clone())?) + Some(AdminRole::new(options.clone(), networking.clone())?) } else { None }; let worker_role = if options.roles.contains(Role::Worker) { - Some(WorkerRole::try_from(options.clone())?) + Some(WorkerRole::new(options.clone(), networking.clone())?) } else { None }; diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 9e6131393..912dc8e51 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -8,8 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use anyhow::Context; use codederror::CodedError; +use restate_network::Networking; use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use tonic::transport::Channel; use tracing::info; @@ -44,6 +47,22 @@ pub struct AdminRole { } impl AdminRole { + pub fn new( + options: Options, + _networking: Arc, + ) -> Result { + let meta = options.meta.build(options.worker.kafka.clone())?; + let admin = options + .admin + .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); + + Ok(AdminRole { + controller: restate_cluster_controller::Service::new(options.cluster_controller), + admin, + meta, + }) + } + pub fn cluster_controller_handle(&self) -> ClusterControllerHandle { self.controller.handle() } @@ -93,23 +112,6 @@ impl AdminRole { } } -impl TryFrom for AdminRole { - type Error = AdminRoleBuildError; - - fn try_from(options: Options) -> Result { - let meta = options.meta.build(options.worker.kafka.clone())?; - let admin = options - .admin - .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); - - Ok(AdminRole { - controller: restate_cluster_controller::Service::new(options.cluster_controller), - admin, - meta, - }) - } -} - #[derive(Debug, Clone)] struct GrpcNodeHandle { grpc_client: NodeSvcClient, diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 7ab9d1f2e..f34eab35e 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -8,9 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; use std::time::Duration; use codederror::CodedError; +use restate_network::Networking; use tonic::transport::Channel; use tracing::debug; use tracing::subscriber::NoSubscriber; @@ -96,6 +98,16 @@ pub struct WorkerRole { } impl WorkerRole { + pub fn new( + options: Options, + networking: Arc, + ) -> Result { + let schemas = Schemas::default(); + let worker = options.worker.build(networking, schemas.clone())?; + + Ok(WorkerRole { schemas, worker }) + } + pub fn rocksdb_storage(&self) -> &RocksDBStorage { self.worker.rocksdb_storage() } @@ -256,17 +268,6 @@ impl WorkerRole { } } -impl TryFrom for WorkerRole { - type Error = WorkerRoleBuildError; - - fn try_from(options: Options) -> Result { - let schemas = Schemas::default(); - let worker = options.worker.build(schemas.clone())?; - - Ok(WorkerRole { schemas, worker }) - } -} - pub async fn update_schemas( schemas: &Schemas, subscription_controller: Option<&SC>, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 823875460..c2a5fbc66 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -26,7 +26,7 @@ use restate_ingress_kafka::Service as IngressKafkaService; use restate_invoker_impl::{ ChannelServiceHandle as InvokerChannelServiceHandle, Service as InvokerService, }; -use restate_network::{PartitionProcessorSender, UnboundedNetworkHandle}; +use restate_network::{Networking, PartitionProcessorSender, UnboundedNetworkHandle}; use restate_schema_impl::Schemas; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_query_datafusion::context::QueryContext; @@ -35,6 +35,7 @@ use restate_storage_rocksdb::{RocksDBStorage, RocksDBWriter}; use restate_types::identifiers::{PartitionKey, PeerId}; use restate_types::message::PartitionTarget; use std::ops::RangeInclusive; +use std::sync::Arc; use tokio::sync::mpsc; use tracing::debug; use util::IdentitySender; @@ -163,9 +164,13 @@ impl Options { &self.storage_rocksdb.path } - pub fn build(self, schemas: Schemas) -> Result { + pub fn build( + self, + networking: Arc, + schemas: Schemas, + ) -> Result { metric_definitions::describe_metrics(); - Worker::new(self, schemas) + Worker::new(self, networking, schemas) } } @@ -194,6 +199,7 @@ impl Error { pub struct Worker { consensus: Consensus, processors: Vec, + networking: Arc, network: network_integration::Network, storage_query_context: QueryContext, storage_query_postgres: PostgresQueryService, @@ -214,7 +220,11 @@ pub struct Worker { } impl Worker { - pub fn new(opts: Options, schemas: Schemas) -> Result { + pub fn new( + opts: Options, + networking: Arc, + schemas: Schemas, + ) -> Result { let Options { channel_size, ingress_grpc, @@ -315,6 +325,7 @@ impl Worker { Ok(Self { consensus, processors, + networking, network, storage_query_context, storage_query_postgres, @@ -418,7 +429,7 @@ impl Worker { // Networking tc.spawn_child( TaskKind::SystemService, - "networking", + "networking-legacy", None, self.network.run(), )?; @@ -449,11 +460,12 @@ impl Worker { // Create partition processors for processor in self.processors { + let networking = self.networking.clone(); tc.spawn_child( TaskKind::PartitionProcessor, "partition-processor", Some(processor.partition_id), - processor.run(), + processor.run(networking), )?; } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index feb9adb7c..87ecdc195 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -18,6 +18,7 @@ use crate::partition::storage::{PartitionStorage, Transaction}; use crate::util::IdentitySender; use futures::StreamExt; use metrics::counter; +use restate_core::NetworkSender; use restate_schema_impl::Schemas; use restate_storage_rocksdb::RocksDBStorage; use restate_types::identifiers::{PartitionId, PartitionKey, PeerId}; @@ -25,6 +26,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::ops::RangeInclusive; +use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::{debug, info, instrument}; @@ -119,7 +121,7 @@ where } #[instrument(level = "trace", skip_all, fields(peer_id = %self.peer_id, partition_id = %self.partition_id))] - pub(super) async fn run(self) -> anyhow::Result<()> { + pub(super) async fn run(self, _networking: Arc) -> anyhow::Result<()> { let PartitionProcessor { peer_id, partition_id,