Skip to content

Commit

Permalink
[reorg] Stage 2
Browse files Browse the repository at this point in the history
- Merge `restate-network` into `restate-core`
- Merge NetworkError and NetworkSendError
  • Loading branch information
AhmedSoliman committed Jun 24, 2024
1 parent 4279c2b commit 51f29b3
Show file tree
Hide file tree
Showing 43 changed files with 250 additions and 385 deletions.
65 changes: 10 additions & 55 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ restate-core = { path = "crates/core" }
restate-errors = { path = "crates/errors" }
restate-fs-util = { path = "crates/fs-util" }
restate-futures-util = { path = "crates/futures-util" }
restate-grpc-util = { path = "crates/grpc-util" }
restate-ingress-dispatcher = { path = "crates/ingress-dispatcher" }
restate-ingress-http = { path = "crates/ingress-http" }
restate-ingress-kafka = { path = "crates/ingress-kafka" }
restate-invoker-api = { path = "crates/invoker-api" }
restate-invoker-impl = { path = "crates/invoker-impl" }
restate-admin-rest-model = { path = "crates/admin-rest-model" }
restate-metadata-store = { path = "crates/metadata-store" }
restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-partition-store = { path = "crates/partition-store" }
restate-queue = { path = "crates/queue" }
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-network = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-types = { workspace = true, features = ["schemars"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use restate_types::cluster::cluster_state::{AliveNode, ClusterState, DeadNode, N
use std::time::Instant;
use tokio::task::JoinHandle;

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_network::rpc_router::RpcRouter;
use restate_types::net::partition_processor_manager::GetProcessorsState;
use restate_types::nodes_config::Role;
use restate_types::time::MillisSinceEpoch;
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tower::ServiceBuilder;
use tracing::info;

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::{cancellation_watcher, task_center, MetadataWriter};
use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_types::schema::subscriptions::SubscriptionValidator;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

use crate::schema_registry::SchemaRegistry;
use restate_bifrost::Bifrost;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::TaskCenter;
use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use tonic::transport::Channel;

#[derive(Clone, derive_builder::Builder)]
Expand Down
10 changes: 5 additions & 5 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::FlightData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::FlightData;
use axum::body::StreamBody;
use axum::extract::State;
use axum::response::IntoResponse;
Expand All @@ -30,14 +30,14 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use okapi_operation::*;
use restate_network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;

use crate::state::QueryServiceState;
use restate_core::network::protobuf::node_svc::StorageQueryRequest;

use super::error::StorageQueryError;
use crate::state::QueryServiceState;

#[serde_as]
#[derive(Debug, Deserialize, JsonSchema)]
Expand Down
20 changes: 15 additions & 5 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
test-util = ["dep:rand", "tokio/test-util"]
test-util = ["tokio/test-util"]
options_schema = ["dep:schemars"]

[dependencies]
Expand All @@ -20,34 +20,44 @@ arc-swap = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
dashmap = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
enumset = { workspace = true }
futures = { workspace = true }
hostname = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true }
metrics = { workspace = true }
rand = { workspace = true, optional = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_with = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["tracing" ] }
tokio-stream = { workspace = true }
tokio = { workspace = true, features = ["tracing"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio-util = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tower = { workspace = true }
tracing = { workspace = true }

[build-dependencies]
tonic-build = { workspace = true }

[dev-dependencies]
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

googletest = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use std::time::Instant;
use tokio::sync::mpsc;
use tracing::instrument;

use restate_core::metadata;
use restate_core::network::NetworkSendError;
use crate::metadata;
use restate_types::net::codec::Targeted;
use restate_types::net::codec::{serialize_message, WireEncode};
use restate_types::net::ProtocolVersion;
Expand All @@ -25,8 +24,10 @@ use restate_types::protobuf::node::Header;
use restate_types::protobuf::node::Message;
use restate_types::GenerationalNodeId;

use crate::metric_definitions::CONNECTION_SEND_DURATION;
use crate::metric_definitions::MESSAGE_SENT;
use super::metric_definitions::CONNECTION_SEND_DURATION;
use super::metric_definitions::MESSAGE_SENT;
use super::NetworkError;
use super::ProtocolError;

/// A single streaming connection with a channel to the peer. A connection can be
/// opened by either ends of the connection and has no direction. Any connection
Expand Down Expand Up @@ -115,21 +116,22 @@ impl ConnectionSender {
/// This doesn't auto-retry connection resets or send errors, this is up to the user
/// for retrying externally.
#[instrument(skip_all, fields(peer_node_id = %self.peer, target_service = ?message.target(), msg = ?message.kind()))]
pub async fn send<M>(&self, message: M) -> Result<(), NetworkSendError>
pub async fn send<M>(&self, message: M) -> Result<(), NetworkError>
where
M: WireEncode + Targeted,
{
let send_start = Instant::now();
let header = Header::new(metadata().nodes_config_version());
let body = serialize_message(message, self.protocol_version)?;
let body =
serialize_message(message, self.protocol_version).map_err(ProtocolError::Codec)?;
let res = self
.connection
.upgrade()
.ok_or(NetworkSendError::ConnectionClosed)?
.ok_or(NetworkError::ConnectionClosed)?
.sender
.send(Message::new(header, body))
.await
.map_err(|_| NetworkSendError::ConnectionClosed);
.map_err(|_| NetworkError::ConnectionClosed);
MESSAGE_SENT.increment(1);
CONNECTION_SEND_DURATION.record(send_start.elapsed());
res
Expand Down
Loading

0 comments on commit 51f29b3

Please sign in to comment.