From 8547f776cc103838e778097a6ba2e1bf4f82876a Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 19 Jun 2024 16:28:37 +0100 Subject: [PATCH] [reorg] Stage 2 Merge `restate-network` into `restate-core` --- Cargo.lock | 65 +++------------ Cargo.toml | 2 - crates/admin/Cargo.toml | 1 - .../src/cluster_controller/cluster_state.rs | 2 +- crates/admin/src/service.rs | 2 +- crates/admin/src/state.rs | 2 +- crates/admin/src/storage_query/query.rs | 10 +-- crates/core/Cargo.toml | 20 +++-- crates/{network => core}/build.rs | 0 .../{network => core}/protobuf/node_svc.proto | 0 .../src => core/src/network}/connection.rs | 8 +- .../src/network}/connection_manager.rs | 24 +++--- crates/core/src/network/error.rs | 70 +++++++++++++++- .../lib.rs => core/src/network/grpc_util.rs} | 0 .../src => core/src/network}/handshake.rs | 2 +- .../src/network}/metric_definitions.rs | 0 crates/core/src/network/mod.rs | 11 +++ .../src => core/src/network}/networking.rs | 8 +- .../src => core/src/network}/protobuf.rs | 0 .../src => core/src/network}/rpc_router.rs | 13 ++- crates/grpc-util/Cargo.toml | 20 ----- crates/metadata-store/Cargo.toml | 1 - .../metadata-store/src/local/grpc/client.rs | 14 ++-- crates/metadata-store/src/local/service.rs | 13 +-- crates/metadata-store/src/local/tests.rs | 23 +++--- crates/network/Cargo.toml | 46 ----------- crates/network/src/error.rs | 81 ------------------- crates/network/src/lib.rs | 22 ----- crates/node/Cargo.toml | 2 - crates/node/src/lib.rs | 10 +-- .../node/src/network_server/handler/node.rs | 10 +-- crates/node/src/network_server/service.rs | 8 +- crates/node/src/roles/admin.rs | 9 ++- crates/node/src/roles/worker.rs | 2 +- crates/worker/Cargo.toml | 1 - crates/worker/src/lib.rs | 5 +- crates/worker/src/partition/leadership/mod.rs | 39 ++++----- crates/worker/src/partition/mod.rs | 47 +++++------ .../worker/src/partition_processor_manager.rs | 4 +- tools/xtask/Cargo.toml | 1 - tools/xtask/src/main.rs | 2 +- 41 files changed, 238 insertions(+), 362 deletions(-) rename crates/{network => core}/build.rs (100%) rename crates/{network => core}/protobuf/node_svc.proto (100%) rename crates/{network/src => core/src/network}/connection.rs (96%) rename crates/{network/src => core/src/network}/connection_manager.rs (97%) rename crates/{grpc-util/src/lib.rs => core/src/network/grpc_util.rs} (100%) rename crates/{network/src => core/src/network}/handshake.rs (99%) rename crates/{network/src => core/src/network}/metric_definitions.rs (100%) rename crates/{network/src => core/src/network}/networking.rs (96%) rename crates/{network/src => core/src/network}/protobuf.rs (100%) rename crates/{network/src => core/src/network}/rpc_router.rs (98%) delete mode 100644 crates/grpc-util/Cargo.toml delete mode 100644 crates/network/Cargo.toml delete mode 100644 crates/network/src/error.rs delete mode 100644 crates/network/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 41a4e27bb..cf20c9390 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5308,7 +5308,6 @@ dependencies = [ "restate-errors", "restate-fs-util", "restate-futures-util", - "restate-network", "restate-serde-util", "restate-service-client", "restate-service-protocol", @@ -5514,6 +5513,7 @@ dependencies = [ "async-trait", "bytes", "bytestring", + "dashmap", "derive_builder", "derive_more", "enum-map", @@ -5521,8 +5521,14 @@ dependencies = [ "futures", "googletest", "hostname", + "http 0.2.12", "humantime", + "hyper 0.14.28", "metrics", + "once_cell", + "pin-project", + "prost", + "prost-types", "rand", "restate-test-util", "restate-types", @@ -5536,6 +5542,9 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tonic 0.10.2", + "tonic-build", + "tower", "tracing", "tracing-subscriber", "tracing-test", @@ -5578,21 +5587,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "restate-grpc-util" -version = "1.1.0" -dependencies = [ - "http 0.2.12", - "hyper 0.14.28", - "restate-types", - "thiserror", - "tokio", - "tokio-stream", - "tonic 0.10.2", - "tower", - "tracing", -] - [[package]] name = "restate-ingress-dispatcher" version = "1.1.0" @@ -5763,7 +5757,6 @@ dependencies = [ "prost", "prost-types", "restate-core", - "restate-grpc-util", "restate-rocksdb", "restate-types", "rocksdb", @@ -5784,40 +5777,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "restate-network" -version = "1.1.0" -dependencies = [ - "anyhow", - "async-trait", - "bytes", - "dashmap", - "enum-map", - "enumset", - "futures", - "googletest", - "http 0.2.12", - "metrics", - "once_cell", - "pin-project", - "prost", - "prost-types", - "rand", - "restate-core", - "restate-grpc-util", - "restate-test-util", - "restate-types", - "static_assertions", - "thiserror", - "tokio", - "tokio-stream", - "tonic 0.10.2", - "tonic-build", - "tower", - "tracing", - "tracing-subscriber", -] - [[package]] name = "restate-node" version = "1.1.0" @@ -5849,9 +5808,7 @@ dependencies = [ "restate-bifrost", "restate-core", "restate-errors", - "restate-grpc-util", "restate-metadata-store", - "restate-network", "restate-rocksdb", "restate-service-client", "restate-service-protocol", @@ -6353,7 +6310,6 @@ dependencies = [ "restate-invoker-api", "restate-invoker-impl", "restate-metadata-store", - "restate-network", "restate-partition-store", "restate-rocksdb", "restate-serde-util", @@ -8436,7 +8392,6 @@ dependencies = [ "restate-bifrost", "restate-core", "restate-metadata-store", - "restate-network", "restate-service-client", "restate-service-protocol", "restate-types", diff --git a/Cargo.toml b/Cargo.toml index 7ff803c2f..26726d1ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ restate-core = { path = "crates/core" } restate-errors = { path = "crates/errors" } restate-fs-util = { path = "crates/fs-util" } restate-futures-util = { path = "crates/futures-util" } -restate-grpc-util = { path = "crates/grpc-util" } restate-ingress-dispatcher = { path = "crates/ingress-dispatcher" } restate-ingress-http = { path = "crates/ingress-http" } restate-ingress-kafka = { path = "crates/ingress-kafka" } @@ -49,7 +48,6 @@ restate-invoker-api = { path = "crates/invoker-api" } restate-invoker-impl = { path = "crates/invoker-impl" } restate-admin-rest-model = { path = "crates/admin-rest-model" } restate-metadata-store = { path = "crates/metadata-store" } -restate-network = { path = "crates/network" } restate-node = { path = "crates/node" } restate-partition-store = { path = "crates/partition-store" } restate-queue = { path = "crates/queue" } diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index b4cef2cec..71f7defaf 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -20,7 +20,6 @@ restate-core = { workspace = true } restate-errors = { workspace = true } restate-fs-util = { workspace = true } restate-futures-util = { workspace = true } -restate-network = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } restate-types = { workspace = true, features = ["schemars"] } diff --git a/crates/admin/src/cluster_controller/cluster_state.rs b/crates/admin/src/cluster_controller/cluster_state.rs index 969f45adc..8b96221cb 100644 --- a/crates/admin/src/cluster_controller/cluster_state.rs +++ b/crates/admin/src/cluster_controller/cluster_state.rs @@ -16,8 +16,8 @@ use restate_types::cluster::cluster_state::{AliveNode, ClusterState, DeadNode, N use std::time::Instant; use tokio::task::JoinHandle; +use restate_core::network::rpc_router::RpcRouter; use restate_core::network::{MessageRouterBuilder, NetworkSender}; -use restate_network::rpc_router::RpcRouter; use restate_types::net::partition_processor_manager::GetProcessorsState; use restate_types::nodes_config::Role; use restate_types::time::MillisSinceEpoch; diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index ca34d3f6c..df431819d 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -20,8 +20,8 @@ use tower::ServiceBuilder; use tracing::info; use restate_core::metadata_store::MetadataStoreClient; +use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_core::{cancellation_watcher, task_center, MetadataWriter}; -use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_service_protocol::discovery::ServiceDiscovery; use restate_types::schema::subscriptions::SubscriptionValidator; diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 9bdb2fc16..46df7ab18 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -11,8 +11,8 @@ use crate::schema_registry::SchemaRegistry; use restate_bifrost::Bifrost; +use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_core::TaskCenter; -use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient; use tonic::transport::Channel; #[derive(Clone, derive_builder::Builder)] diff --git a/crates/admin/src/storage_query/query.rs b/crates/admin/src/storage_query/query.rs index 3a9e7a84f..b844e582c 100644 --- a/crates/admin/src/storage_query/query.rs +++ b/crates/admin/src/storage_query/query.rs @@ -8,13 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use arrow_flight::decode::FlightRecordBatchStream; -use arrow_flight::error::FlightError; -use arrow_flight::FlightData; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow_flight::decode::FlightRecordBatchStream; +use arrow_flight::error::FlightError; +use arrow_flight::FlightData; use axum::body::StreamBody; use axum::extract::State; use axum::response::IntoResponse; @@ -30,14 +30,14 @@ use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::record_batch::RecordBatch; use futures::{ready, Stream, StreamExt, TryStreamExt}; use okapi_operation::*; -use restate_network::protobuf::node_svc::StorageQueryRequest; use schemars::JsonSchema; use serde::Deserialize; use serde_with::serde_as; -use crate::state::QueryServiceState; +use restate_core::network::protobuf::node_svc::StorageQueryRequest; use super::error::StorageQueryError; +use crate::state::QueryServiceState; #[serde_as] #[derive(Debug, Deserialize, JsonSchema)] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index c58ef21e6..a0a72129c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -9,7 +9,7 @@ publish = false [features] default = [] -test-util = ["dep:rand", "tokio/test-util"] +test-util = ["tokio/test-util", "restate-types/test-util"] options_schema = ["dep:schemars"] [dependencies] @@ -20,15 +20,22 @@ arc-swap = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } +dashmap = { workspace = true } derive_builder = { workspace = true } derive_more = { workspace = true } enum-map = { workspace = true } enumset = { workspace = true } futures = { workspace = true } hostname = { workspace = true } +http = { workspace = true } humantime = { workspace = true } +hyper = { workspace = true } metrics = { workspace = true } -rand = { workspace = true, optional = true } +once_cell = { workspace = true } +pin-project = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } +rand = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } serde_with = { workspace = true } @@ -36,18 +43,21 @@ static_assertions = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["tracing" ] } -tokio-stream = { workspace = true } +tokio = { workspace = true, features = ["tracing"] } +tokio-stream = { workspace = true, features = ["net"] } tokio-util = { workspace = true } +tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] } +tower = { workspace = true } tracing = { workspace = true } +[build-dependencies] +tonic-build = { workspace = true } [dev-dependencies] restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } googletest = { workspace = true } -rand = { workspace = true } tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } tracing-test = { workspace = true } diff --git a/crates/network/build.rs b/crates/core/build.rs similarity index 100% rename from crates/network/build.rs rename to crates/core/build.rs diff --git a/crates/network/protobuf/node_svc.proto b/crates/core/protobuf/node_svc.proto similarity index 100% rename from crates/network/protobuf/node_svc.proto rename to crates/core/protobuf/node_svc.proto diff --git a/crates/network/src/connection.rs b/crates/core/src/network/connection.rs similarity index 96% rename from crates/network/src/connection.rs rename to crates/core/src/network/connection.rs index 20c425c0f..4860b23d9 100644 --- a/crates/network/src/connection.rs +++ b/crates/core/src/network/connection.rs @@ -15,8 +15,7 @@ use std::time::Instant; use tokio::sync::mpsc; use tracing::instrument; -use restate_core::metadata; -use restate_core::network::NetworkSendError; +use crate::metadata; use restate_types::net::codec::Targeted; use restate_types::net::codec::{serialize_message, WireEncode}; use restate_types::net::ProtocolVersion; @@ -25,8 +24,9 @@ use restate_types::protobuf::node::Header; use restate_types::protobuf::node::Message; use restate_types::GenerationalNodeId; -use crate::metric_definitions::CONNECTION_SEND_DURATION; -use crate::metric_definitions::MESSAGE_SENT; +use super::error::NetworkSendError; +use super::metric_definitions::CONNECTION_SEND_DURATION; +use super::metric_definitions::MESSAGE_SENT; /// A single streaming connection with a channel to the peer. A connection can be /// opened by either ends of the connection and has no direction. Any connection diff --git a/crates/network/src/connection_manager.rs b/crates/core/src/network/connection_manager.rs similarity index 97% rename from crates/network/src/connection_manager.rs rename to crates/core/src/network/connection_manager.rs index 0d52a8d84..187d1912a 100644 --- a/crates/network/src/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -15,29 +15,29 @@ use std::time::Instant; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use rand::seq::SliceRandom; -use restate_core::network::{Handler, MessageRouter}; use restate_types::net::codec::try_unwrap_binary_message; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tracing::{debug, info, trace, warn, Instrument, Span}; -use restate_core::metadata; -use restate_core::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind}; -use restate_grpc_util::create_grpc_channel_from_advertised_address; use restate_types::net::AdvertisedAddress; use restate_types::protobuf::node::message::{self, ConnectionControl}; use restate_types::protobuf::node::{Header, Hello, Message, Welcome}; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; use super::connection::{Connection, ConnectionSender}; +use super::error::{NetworkError, ProtocolError}; +use super::grpc_util::create_grpc_channel_from_advertised_address; use super::handshake::{negotiate_protocol_version, wait_for_hello, wait_for_welcome}; -use crate::error::{NetworkError, ProtocolError}; -use crate::metric_definitions::{ +use super::metric_definitions::{ self, CONNECTION_DROPPED, INCOMING_CONNECTION, MESSAGE_PROCESSING_DURATION, MESSAGE_RECEIVED, ONGOING_DRAIN, OUTGOING_CONNECTION, }; -use crate::protobuf::node_svc::node_svc_client::NodeSvcClient; +use super::protobuf::node_svc::node_svc_client::NodeSvcClient; +use super::{Handler, MessageRouter}; +use crate::metadata; +use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind}; // todo: make this configurable const SEND_QUEUE_SIZE: usize = 1; @@ -595,13 +595,13 @@ fn on_connection_terminated(inner_manager: &Mutex) { #[cfg(test)] mod tests { - use crate::handshake::HANDSHAKE_TIMEOUT; + use crate::network::handshake::HANDSHAKE_TIMEOUT; use super::*; use googletest::prelude::*; - use restate_core::TestCoreEnv; + use crate::TestCoreEnv; use restate_test_util::assert_eq; use restate_types::net::{ ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION, @@ -616,7 +616,7 @@ mod tests { test_setup .tc .run_in_scope("test", None, async { - let metadata = restate_core::metadata(); + let metadata = crate::metadata(); let (tx, rx) = mpsc::channel(1); let connections = ConnectionManager::default(); @@ -680,7 +680,7 @@ mod tests { test_setup .tc .run_in_scope("test", None, async { - let metadata = restate_core::metadata(); + let metadata = crate::metadata(); let (tx, rx) = mpsc::channel(1); let my_node_id = metadata.my_node_id(); @@ -744,7 +744,7 @@ mod tests { test_setup .tc .run_in_scope("test", None, async { - let metadata = restate_core::metadata(); + let metadata = crate::metadata(); let (tx, rx) = mpsc::channel(1); let mut my_node_id = metadata.my_node_id(); diff --git a/crates/core/src/network/error.rs b/crates/core/src/network/error.rs index 4ac90a851..1fd21694b 100644 --- a/crates/core/src/network/error.rs +++ b/crates/core/src/network/error.rs @@ -8,8 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_types::net::CodecError; +use restate_types::net::{CodecError, MIN_SUPPORTED_PROTOCOL_VERSION}; use restate_types::nodes_config::NodesConfigError; +use restate_types::NodeId; use crate::ShutdownError; @@ -36,3 +37,70 @@ pub enum NetworkSendError { #[error("cannot send messages to this node: {0}")] Unavailable(String), } + +#[derive(Debug, thiserror::Error)] +pub enum NetworkError { + #[error("unknown node: {0}")] + UnknownNode(#[from] NodesConfigError), + #[error("operation aborted, node is shutting down")] + Shutdown(#[from] ShutdownError), + #[error("node {0} address is bad: {1}")] + BadNodeAddress(NodeId, http::Error), + #[error("timeout: {0}")] + Timeout(&'static str), + #[error("protocol error: {0}")] + ProtocolError(#[from] ProtocolError), + #[error("cannot connect: {} {}", tonic::Status::code(.0), tonic::Status::message(.0))] + ConnectError(#[from] tonic::Status), + #[error("new node generation exists: {0}")] + OldPeerGeneration(String), + #[error("peer is not connected")] + ConnectionClosed, +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + #[error("handshake failed: {0}")] + HandshakeFailed(&'static str), + #[error("handshake timeout: {0}")] + HandshakeTimeout(&'static str), + #[error("peer dropped connection")] + PeerDropped, + #[error("error in codec: {0}")] + Codec(String), + #[error("grpc error: {0}")] + GrpcError(#[from] tonic::Status), + #[error( + "peer has unsupported protocol version {0}, minimum supported is '{}'", + MIN_SUPPORTED_PROTOCOL_VERSION as i32 + )] + UnsupportedVersion(i32), +} + +impl From for tonic::Status { + fn from(value: ProtocolError) -> Self { + match value { + ProtocolError::HandshakeFailed(e) => tonic::Status::invalid_argument(e), + ProtocolError::HandshakeTimeout(e) => tonic::Status::deadline_exceeded(e), + ProtocolError::PeerDropped => tonic::Status::cancelled("peer dropped"), + ProtocolError::Codec(e) => tonic::Status::internal(e), + ProtocolError::UnsupportedVersion(_) => { + tonic::Status::invalid_argument(value.to_string()) + } + ProtocolError::GrpcError(s) => s, + } + } +} + +impl From for tonic::Status { + fn from(value: NetworkError) -> Self { + match value { + NetworkError::Shutdown(_) => tonic::Status::unavailable(value.to_string()), + NetworkError::ProtocolError(e) => e.into(), + NetworkError::Timeout(e) => tonic::Status::deadline_exceeded(e), + NetworkError::OldPeerGeneration(e) => tonic::Status::already_exists(e), + NetworkError::ConnectError(s) => s, + e => tonic::Status::internal(e.to_string()), + } + } +} diff --git a/crates/grpc-util/src/lib.rs b/crates/core/src/network/grpc_util.rs similarity index 100% rename from crates/grpc-util/src/lib.rs rename to crates/core/src/network/grpc_util.rs diff --git a/crates/network/src/handshake.rs b/crates/core/src/network/handshake.rs similarity index 99% rename from crates/network/src/handshake.rs rename to crates/core/src/network/handshake.rs index 78893930d..52b68a47f 100644 --- a/crates/network/src/handshake.rs +++ b/crates/core/src/network/handshake.rs @@ -15,7 +15,7 @@ use restate_types::net::{ProtocolVersion, CURRENT_PROTOCOL_VERSION}; use restate_types::protobuf::node::{message, Header, Hello, Message, Welcome}; use tokio_stream::StreamExt; -use crate::error::ProtocolError; +use super::error::ProtocolError; //todo: make this configurable pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3); diff --git a/crates/network/src/metric_definitions.rs b/crates/core/src/network/metric_definitions.rs similarity index 100% rename from crates/network/src/metric_definitions.rs rename to crates/core/src/network/metric_definitions.rs diff --git a/crates/core/src/network/mod.rs b/crates/core/src/network/mod.rs index d993f8a85..fdcfa438d 100644 --- a/crates/core/src/network/mod.rs +++ b/crates/core/src/network/mod.rs @@ -8,10 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod connection; +mod connection_manager; mod error; +pub mod grpc_util; +mod handshake; mod message_router; +pub(crate) mod metric_definitions; mod network_sender; +mod networking; +pub mod protobuf; +pub mod rpc_router; +pub use connection::ConnectionSender; +pub use connection_manager::ConnectionManager; pub use error::*; pub use message_router::*; pub use network_sender::*; +pub use networking::Networking; diff --git a/crates/network/src/networking.rs b/crates/core/src/network/networking.rs similarity index 96% rename from crates/network/src/networking.rs rename to crates/core/src/network/networking.rs index d7f97d585..9c3c8d3e1 100644 --- a/crates/network/src/networking.rs +++ b/crates/core/src/network/networking.rs @@ -13,13 +13,13 @@ use std::time::Duration; use restate_types::retries::with_jitter; use tracing::{info, instrument, trace}; -use restate_core::metadata; -use restate_core::network::{NetworkSendError, NetworkSender}; use restate_types::net::codec::{Targeted, WireEncode}; use restate_types::NodeId; -use crate::error::NetworkError; -use crate::{ConnectionManager, ConnectionSender}; +use super::error::NetworkError; +use super::{ConnectionManager, ConnectionSender}; +use super::{NetworkSendError, NetworkSender}; +use crate::metadata; const DEFAULT_MAX_CONNECT_ATTEMPTS: u32 = 10; // todo: make this configurable diff --git a/crates/network/src/protobuf.rs b/crates/core/src/network/protobuf.rs similarity index 100% rename from crates/network/src/protobuf.rs rename to crates/core/src/network/protobuf.rs diff --git a/crates/network/src/rpc_router.rs b/crates/core/src/network/rpc_router.rs similarity index 98% rename from crates/network/src/rpc_router.rs rename to crates/core/src/network/rpc_router.rs index 8245efcfe..4cb840719 100644 --- a/crates/network/src/rpc_router.rs +++ b/crates/core/src/network/rpc_router.rs @@ -14,16 +14,15 @@ use dashmap::mapref::entry::Entry; use dashmap::DashMap; use futures::stream::BoxStream; use futures::StreamExt; -use restate_core::{cancellation_watcher, ShutdownError}; -use restate_types::net::codec::{Targeted, WireDecode, WireEncode}; -use restate_types::NodeId; use tokio::sync::oneshot; +use tracing::warn; -use restate_core::network::{ - MessageHandler, MessageRouterBuilder, NetworkSendError, NetworkSender, -}; +use restate_types::net::codec::{Targeted, WireDecode, WireEncode}; use restate_types::net::{MessageEnvelope, RpcMessage, RpcRequest}; -use tracing::warn; +use restate_types::NodeId; + +use super::{MessageHandler, MessageRouterBuilder, NetworkSendError, NetworkSender}; +use crate::{cancellation_watcher, ShutdownError}; /// A router for sending and receiving RPC messages through Networking /// diff --git a/crates/grpc-util/Cargo.toml b/crates/grpc-util/Cargo.toml deleted file mode 100644 index 74974fd17..000000000 --- a/crates/grpc-util/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "restate-grpc-util" -version.workspace = true -authors.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -publish = false - -[dependencies] -restate-types = { workspace = true } - -http = { workspace = true } -hyper = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -tokio-stream = { workspace = true, features = ["net"] } -tonic = { workspace = true } -tower = { workspace = true } -tracing = { workspace = true } diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index a2bdaaf08..509d23497 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -14,7 +14,6 @@ test-util = [] [dependencies] codederror = { workspace = true } restate-core = { workspace = true } -restate-grpc-util = { workspace = true } restate-rocksdb = { workspace = true } restate-types = { workspace = true } diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/local/grpc/client.rs index f0a36903d..baf7146af 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/local/grpc/client.rs @@ -8,19 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient; -use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; use async_trait::async_trait; use bytestring::ByteString; +use tonic::transport::Channel; +use tonic::{Code, Status}; + use restate_core::metadata_store::{ MetadataStore, Precondition, ReadError, VersionedValue, WriteError, }; -use restate_grpc_util::create_grpc_channel_from_advertised_address; +use restate_core::network::grpc_util::create_grpc_channel_from_advertised_address; use restate_types::net::AdvertisedAddress; use restate_types::Version; -use tonic::transport::Channel; -use tonic::{Code, Status}; + +use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient; +use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest}; +use crate::local::grpc::pb_conversions::ConversionError; /// Client end to interact with the [`LocalMetadataStore`]. #[derive(Debug, Clone)] diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index 1cf217f76..3a90abf81 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -8,10 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::grpc_svc; -use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; -use crate::local::grpc::handler::LocalMetadataStoreHandler; -use crate::local::store::LocalMetadataStore; +use restate_core::network::grpc_util; use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind}; use restate_types::arc_util::Updateable; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; @@ -19,6 +16,10 @@ use restate_types::net::BindAddress; use tonic::server::NamedService; use super::BuildError; +use crate::grpc_svc; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::local::grpc::handler::LocalMetadataStoreHandler; +use crate::local::store::LocalMetadataStore; pub struct LocalMetadataStoreService { metadata_store: LocalMetadataStore, @@ -28,7 +29,7 @@ pub struct LocalMetadataStoreService { #[derive(Debug, thiserror::Error)] pub enum Error { #[error("failed running grpc server: {0}")] - GrpcServer(#[from] restate_grpc_util::Error), + GrpcServer(#[from] grpc_util::Error), #[error("error while running server server grpc reflection service: {0}")] GrpcReflection(#[from] tonic_reflection::server::Error), #[error("system is shutting down")] @@ -86,7 +87,7 @@ impl LocalMetadataStoreService { "metadata-store-grpc", None, async move { - restate_grpc_util::run_hyper_server( + grpc_util::run_hyper_server( &self.bind_address, service, cancellation_watcher(), diff --git a/crates/metadata-store/src/local/tests.rs b/crates/metadata-store/src/local/tests.rs index 701f9e94a..27b9e3b41 100644 --- a/crates/metadata-store/src/local/tests.rs +++ b/crates/metadata-store/src/local/tests.rs @@ -8,15 +8,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::local::grpc::client::LocalMetadataStoreClient; -use crate::local::service::LocalMetadataStoreService; -use crate::local::store::LocalMetadataStore; -use crate::{MetadataStoreClient, Precondition, WriteError}; +use std::time::Duration; + use bytestring::ByteString; use futures::stream::FuturesUnordered; use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use test_log::test; +use tonic_health::pb::health_client::HealthClient; +use tonic_health::pb::HealthCheckRequest; + +use restate_core::network::grpc_util::create_grpc_channel_from_advertised_address; use restate_core::{MockNetworkSender, TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; -use restate_grpc_util::create_grpc_channel_from_advertised_address; use restate_rocksdb::RocksDbManager; use restate_types::arc_util::{Constant, Updateable}; use restate_types::config::{ @@ -25,11 +28,11 @@ use restate_types::config::{ use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::retries::RetryPolicy; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use test_log::test; -use tonic_health::pb::health_client::HealthClient; -use tonic_health::pb::HealthCheckRequest; + +use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::local::service::LocalMetadataStoreService; +use crate::local::store::LocalMetadataStore; +use crate::{MetadataStoreClient, Precondition, WriteError}; #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize, Default)] struct Value { diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml deleted file mode 100644 index 76494d237..000000000 --- a/crates/network/Cargo.toml +++ /dev/null @@ -1,46 +0,0 @@ -[package] -name = "restate-network" -version.workspace = true -authors.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -publish = false - -[dependencies] -restate-core = { workspace = true } -restate-grpc-util = { workspace = true } -restate-types = { workspace = true } - -anyhow = { workspace = true } -async-trait = { workspace = true } -bytes = { workspace = true } -dashmap = { workspace = true } -enum-map = { workspace = true } -enumset = { workspace = true } -futures = { workspace = true } -http = { workspace = true } -metrics = { workspace = true } -once_cell = { workspace = true } -pin-project = { workspace = true } -prost = { workspace = true } -prost-types = { workspace = true } -rand = { workspace = true } -static_assertions = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -tokio-stream = { workspace = true } -tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] } -tower = { workspace = true } -tracing = { workspace = true } - -[build-dependencies] -tonic-build = { workspace = true } - -[dev-dependencies] -restate-core = { workspace = true, features = ["test-util"] } -restate-test-util = { workspace = true } - -googletest = { workspace = true } -tokio = { workspace = true, features = ["test-util"] } -tracing-subscriber = { workspace = true } diff --git a/crates/network/src/error.rs b/crates/network/src/error.rs deleted file mode 100644 index 6c6a7c8fd..000000000 --- a/crates/network/src/error.rs +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use restate_core::ShutdownError; -use restate_types::net::MIN_SUPPORTED_PROTOCOL_VERSION; -use restate_types::nodes_config::NodesConfigError; -use restate_types::NodeId; - -#[derive(Debug, thiserror::Error)] -pub enum NetworkError { - #[error("unknown node: {0}")] - UnknownNode(#[from] NodesConfigError), - #[error("operation aborted, node is shutting down")] - Shutdown(#[from] ShutdownError), - #[error("node {0} address is bad: {1}")] - BadNodeAddress(NodeId, http::Error), - #[error("timeout: {0}")] - Timeout(&'static str), - #[error("protocol error: {0}")] - ProtocolError(#[from] ProtocolError), - #[error("cannot connect: {} {}", tonic::Status::code(.0), tonic::Status::message(.0))] - ConnectError(#[from] tonic::Status), - #[error("new node generation exists: {0}")] - OldPeerGeneration(String), - #[error("peer is not connected")] - ConnectionClosed, -} - -#[derive(Debug, thiserror::Error)] -pub enum ProtocolError { - #[error("handshake failed: {0}")] - HandshakeFailed(&'static str), - #[error("handshake timeout: {0}")] - HandshakeTimeout(&'static str), - #[error("peer dropped connection")] - PeerDropped, - #[error("error in codec: {0}")] - Codec(String), - #[error("grpc error: {0}")] - GrpcError(#[from] tonic::Status), - #[error( - "peer has unsupported protocol version {0}, minimum supported is '{}'", - MIN_SUPPORTED_PROTOCOL_VERSION as i32 - )] - UnsupportedVersion(i32), -} - -impl From for tonic::Status { - fn from(value: ProtocolError) -> Self { - match value { - ProtocolError::HandshakeFailed(e) => tonic::Status::invalid_argument(e), - ProtocolError::HandshakeTimeout(e) => tonic::Status::deadline_exceeded(e), - ProtocolError::PeerDropped => tonic::Status::cancelled("peer dropped"), - ProtocolError::Codec(e) => tonic::Status::internal(e), - ProtocolError::UnsupportedVersion(_) => { - tonic::Status::invalid_argument(value.to_string()) - } - ProtocolError::GrpcError(s) => s, - } - } -} - -impl From for tonic::Status { - fn from(value: NetworkError) -> Self { - match value { - NetworkError::Shutdown(_) => tonic::Status::unavailable(value.to_string()), - NetworkError::ProtocolError(e) => e.into(), - NetworkError::Timeout(e) => tonic::Status::deadline_exceeded(e), - NetworkError::OldPeerGeneration(e) => tonic::Status::already_exists(e), - NetworkError::ConnectError(s) => s, - e => tonic::Status::internal(e.to_string()), - } - } -} diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs deleted file mode 100644 index db56592c0..000000000 --- a/crates/network/src/lib.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -mod connection; -mod connection_manager; -pub mod error; -mod handshake; -pub(crate) mod metric_definitions; -mod networking; -pub mod protobuf; -pub mod rpc_router; - -pub use connection::ConnectionSender; -pub use connection_manager::ConnectionManager; -pub use networking::Networking; diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 7e270a2ef..2a0eb611b 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -20,9 +20,7 @@ restate-admin = { workspace = true, features = ["servers"] } restate-bifrost = { workspace = true } restate-core = { workspace = true } restate-errors = { workspace = true } -restate-grpc-util = { workspace = true } restate-metadata-store = { workspace = true } -restate-network = { workspace = true } restate-rocksdb = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 0a7880e87..aec5866e7 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -12,11 +12,6 @@ mod cluster_marker; mod network_server; mod roles; -use restate_bifrost::BifrostService; -use restate_core::network::MessageRouterBuilder; -use restate_network::Networking; -use restate_types::arc_util::ArcSwapExt; -use restate_types::config::{CommonOptions, Configuration, UpdateableConfiguration}; use std::future::Future; use std::time::Duration; @@ -24,11 +19,16 @@ use codederror::CodedError; use tokio::time::Instant; use tracing::{debug, error, info, trace}; +use restate_bifrost::BifrostService; use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError}; +use restate_core::network::MessageRouterBuilder; +use restate_core::network::Networking; use restate_core::{spawn_metadata_manager, MetadataKind, MetadataManager}; use restate_core::{task_center, TaskKind}; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; +use restate_types::arc_util::ArcSwapExt; +use restate_types::config::{CommonOptions, Configuration, UpdateableConfiguration}; use restate_types::logs::metadata::{create_static_metadata, Logs}; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, NODES_CONFIG_KEY, PARTITION_TABLE_KEY, diff --git a/crates/node/src/network_server/handler/node.rs b/crates/node/src/network_server/handler/node.rs index 224a14ca0..60f693203 100644 --- a/crates/node/src/network_server/handler/node.rs +++ b/crates/node/src/network_server/handler/node.rs @@ -15,12 +15,12 @@ use futures::TryStreamExt; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; +use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc; +use restate_core::network::protobuf::node_svc::IdentResponse; +use restate_core::network::protobuf::node_svc::{StorageQueryRequest, StorageQueryResponse}; +use restate_core::network::ConnectionManager; +use restate_core::network::ProtocolError; use restate_core::{metadata, TaskCenter}; -use restate_network::error::ProtocolError; -use restate_network::protobuf::node_svc::node_svc_server::NodeSvc; -use restate_network::protobuf::node_svc::IdentResponse; -use restate_network::protobuf::node_svc::{StorageQueryRequest, StorageQueryResponse}; -use restate_network::ConnectionManager; use restate_types::protobuf::common::NodeStatus; use restate_types::protobuf::node::Message; diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index cd3b8800b..d319aad96 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -14,11 +14,11 @@ use tower_http::trace::TraceLayer; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use restate_admin::cluster_controller::ClusterControllerHandle; +use restate_core::network::grpc_util::run_hyper_server; +use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvcServer; +use restate_core::network::ConnectionManager; use restate_core::{cancellation_watcher, task_center}; -use restate_grpc_util::run_hyper_server; use restate_metadata_store::MetadataStoreClient; -use restate_network::protobuf::node_svc::node_svc_server::NodeSvcServer; -use restate_network::ConnectionManager; use restate_storage_query_datafusion::context::QueryContext; use restate_types::config::CommonOptions; use restate_worker::SubscriptionControllerHandle; @@ -76,7 +76,7 @@ impl NetworkServer { // -- GRPC Service Setup let mut reflection_service_builder = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set( - restate_network::protobuf::node_svc::FILE_DESCRIPTOR_SET, + restate_core::network::protobuf::node_svc::FILE_DESCRIPTOR_SET, ) .register_encoded_file_descriptor_set(restate_types::protobuf::FILE_DESCRIPTOR_SET); diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 33958c229..8bbefc3e9 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -8,19 +8,20 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::time::Duration; + use anyhow::Context; use codederror::CodedError; -use restate_core::network::MessageRouterBuilder; -use restate_network::Networking; -use std::time::Duration; use tonic::transport::Channel; use restate_admin::cluster_controller::ClusterControllerHandle; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; use restate_core::metadata_store::MetadataStoreClient; +use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; +use restate_core::network::MessageRouterBuilder; +use restate_core::network::Networking; use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind}; -use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; use restate_types::arc_util::ArcSwapExt; diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 91af8fcea..88108db3c 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -12,10 +12,10 @@ use codederror::CodedError; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; +use restate_core::network::Networking; use restate_core::{cancellation_watcher, metadata, task_center, Metadata, MetadataKind}; use restate_core::{ShutdownError, TaskKind}; use restate_metadata_store::MetadataStoreClient; -use restate_network::Networking; use restate_storage_query_datafusion::context::QueryContext; use restate_types::config::UpdateableConfiguration; use restate_types::schema::subscriptions::SubscriptionResolver; diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 68d7a781f..b19a43b97 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -30,7 +30,6 @@ restate-ingress-kafka = { workspace = true } restate-invoker-api = { workspace = true } restate-invoker-impl = { workspace = true } restate-metadata-store = { workspace = true } -restate-network = { workspace = true } restate-partition-store = { workspace = true } restate-rocksdb = { workspace = true } restate-serde-util = { workspace = true, features = ["proto"] } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 101e606b0..0fd93e87e 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -20,14 +20,16 @@ mod partition_processor_manager; mod subscription_controller; mod subscription_integration; +use codederror::CodedError; + use restate_types::arc_util::ArcSwapExt; use restate_types::config::UpdateableConfiguration; pub use subscription_controller::SubscriptionController; pub use subscription_integration::SubscriptionControllerHandle; -use codederror::CodedError; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; +use restate_core::network::Networking; use restate_core::{task_center, Metadata, TaskKind}; use restate_ingress_dispatcher::IngressDispatcher; use restate_ingress_http::HyperServerIngress; @@ -36,7 +38,6 @@ use restate_invoker_impl::{ InvokerHandle as InvokerChannelServiceHandle, Service as InvokerService, }; use restate_metadata_store::MetadataStoreClient; -use restate_network::Networking; use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_query_datafusion::context::QueryContext; diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 48f9c7cd8..d294d4519 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -8,41 +8,42 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::PARTITION_HANDLE_LEADER_ACTIONS; -use crate::partition::shuffle::{HintSender, Shuffle, ShuffleMetadata}; -use crate::partition::{shuffle, storage}; -use futures::future::OptionFuture; -use futures::{future, StreamExt}; -use metrics::counter; -use restate_core::network::NetworkSender; -use restate_core::{ - current_task_partition_id, metadata, task_center, ShutdownError, TaskId, TaskKind, -}; -use restate_invoker_api::InvokeInputJournal; -use restate_network::Networking; -use restate_timer::TokioClock; -use restate_types::net::ingress; +mod action_collector; + use std::fmt::Debug; use std::ops::RangeInclusive; use std::pin::Pin; + +use futures::future::OptionFuture; +use futures::{future, StreamExt}; +use metrics::counter; use tokio::sync::mpsc; use tracing::{debug, trace, warn}; -mod action_collector; - -use crate::partition::action_effect_handler::ActionEffectHandler; -use crate::partition::state_machine::Action; -pub(crate) use action_collector::{ActionEffect, ActionEffectStream}; use restate_bifrost::Bifrost; +use restate_core::network::NetworkSender; +use restate_core::network::Networking; +use restate_core::{ + current_task_partition_id, metadata, task_center, ShutdownError, TaskId, TaskKind, +}; use restate_errors::NotRunningError; +use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::EpochSequenceNumber; +use restate_timer::TokioClock; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; +use restate_types::net::ingress; use restate_types::GenerationalNodeId; use restate_wal_protocol::timer::TimerKeyValue; use super::storage::invoker::InvokerStorageReader; +use crate::metric_definitions::PARTITION_HANDLE_LEADER_ACTIONS; +use crate::partition::action_effect_handler::ActionEffectHandler; +use crate::partition::shuffle::{HintSender, Shuffle, ShuffleMetadata}; +use crate::partition::state_machine::Action; +use crate::partition::{shuffle, storage}; +pub(crate) use action_collector::{ActionEffect, ActionEffectStream}; type PartitionStorage = storage::PartitionStorage; type TimerService = restate_timer::TimerService; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index d62300e6c..36aa4381e 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -8,49 +8,50 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::metric_definitions::{ - PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, - PARTITION_TIMER_DUE_HANDLED, PP_APPLY_RECORD_DURATION, -}; -use crate::partition::leadership::{ActionEffect, LeadershipState}; -use crate::partition::state_machine::{ActionCollector, Effects, StateMachine}; -use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction}; -use assert2::let_assert; -use futures::TryStreamExt as _; -use metrics::{counter, histogram}; -use restate_core::metadata; -use restate_network::Networking; -use restate_partition_store::{PartitionStore, RocksDBTransaction}; -use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; -use restate_types::identifiers::{PartitionId, PartitionKey}; -use restate_types::time::MillisSinceEpoch; use std::fmt::Debug; use std::marker::PhantomData; use std::ops::RangeInclusive; use std::time::{Duration, Instant}; + +use assert2::let_assert; +use futures::TryStreamExt as _; +use metrics::{counter, histogram}; use tokio::sync::{mpsc, watch}; use tokio::time::MissedTickBehavior; use tokio_stream::StreamExt; use tracing::{debug, instrument, trace, Span}; -mod action_effect_handler; -mod leadership; -pub mod shuffle; -mod state_machine; -pub mod storage; -pub mod types; - use restate_bifrost::{Bifrost, FindTailAttributes, LogRecord, Record}; use restate_core::cancellation_watcher; +use restate_core::metadata; +use restate_core::network::Networking; +use restate_partition_store::{PartitionStore, RocksDBTransaction}; use restate_storage_api::deduplication_table::{ DedupInformation, DedupSequenceNumber, EpochSequenceNumber, ProducerId, }; use restate_storage_api::StorageError; +use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; +use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; +use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header}; use self::storage::invoker::InvokerStorageReader; +use crate::metric_definitions::{ + PARTITION_ACTUATOR_HANDLED, PARTITION_LABEL, PARTITION_LEADER_HANDLE_ACTION_BATCH_DURATION, + PARTITION_TIMER_DUE_HANDLED, PP_APPLY_RECORD_DURATION, +}; +use crate::partition::leadership::{ActionEffect, LeadershipState}; +use crate::partition::state_machine::{ActionCollector, Effects, StateMachine}; +use crate::partition::storage::{DedupSequenceNumberResolver, PartitionStorage, Transaction}; + +mod action_effect_handler; +mod leadership; +pub mod shuffle; +mod state_machine; +pub mod storage; +pub mod types; /// Control messages from Manager to individual partition processor instances. pub enum PartitionProcessorControlCommand {} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 51cab822f..d7a2ef930 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -23,15 +23,15 @@ use tokio::time::MissedTickBehavior; use tracing::{debug, info, trace, warn}; use restate_bifrost::Bifrost; +use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkSender; +use restate_core::network::Networking; use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; use restate_core::TaskCenter; use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; -use restate_network::rpc_router::{RpcError, RpcRouter}; -use restate_network::Networking; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_storage_api::StorageError; use restate_types::arc_util::{ArcSwapExt, Updateable}; diff --git a/tools/xtask/Cargo.toml b/tools/xtask/Cargo.toml index 0932be091..6b6851101 100644 --- a/tools/xtask/Cargo.toml +++ b/tools/xtask/Cargo.toml @@ -12,7 +12,6 @@ restate-admin = { workspace = true, features = ["options_schema"] } restate-bifrost = { workspace = true, features = ["options_schema", "test-util"] } restate-core = { workspace = true, features = ["test-util"] } restate-metadata-store = { workspace = true, features = ["test-util"] } -restate-network = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"]} restate-types = { workspace = true, features = ["schemars"] } diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index fd03b39ea..dfeadb2aa 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -18,9 +18,9 @@ use tonic::transport::{Channel, Uri}; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; +use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_core::TaskKind; use restate_core::TestCoreEnv; -use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_service_protocol::discovery::ServiceDiscovery; use restate_types::arc_util::Constant;