Skip to content

Commit

Permalink
[reorg] Stage 2
Browse files Browse the repository at this point in the history
Merge `restate-network` into `restate-core`
  • Loading branch information
AhmedSoliman committed Jun 19, 2024
1 parent 3870f62 commit 8547f77
Show file tree
Hide file tree
Showing 41 changed files with 238 additions and 362 deletions.
65 changes: 10 additions & 55 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ restate-core = { path = "crates/core" }
restate-errors = { path = "crates/errors" }
restate-fs-util = { path = "crates/fs-util" }
restate-futures-util = { path = "crates/futures-util" }
restate-grpc-util = { path = "crates/grpc-util" }
restate-ingress-dispatcher = { path = "crates/ingress-dispatcher" }
restate-ingress-http = { path = "crates/ingress-http" }
restate-ingress-kafka = { path = "crates/ingress-kafka" }
restate-invoker-api = { path = "crates/invoker-api" }
restate-invoker-impl = { path = "crates/invoker-impl" }
restate-admin-rest-model = { path = "crates/admin-rest-model" }
restate-metadata-store = { path = "crates/metadata-store" }
restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-partition-store = { path = "crates/partition-store" }
restate-queue = { path = "crates/queue" }
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-network = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-types = { workspace = true, features = ["schemars"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use restate_types::cluster::cluster_state::{AliveNode, ClusterState, DeadNode, N
use std::time::Instant;
use tokio::task::JoinHandle;

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, NetworkSender};
use restate_network::rpc_router::RpcRouter;
use restate_types::net::partition_processor_manager::GetProcessorsState;
use restate_types::nodes_config::Role;
use restate_types::time::MillisSinceEpoch;
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tower::ServiceBuilder;
use tracing::info;

use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::{cancellation_watcher, task_center, MetadataWriter};
use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_types::schema::subscriptions::SubscriptionValidator;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

use crate::schema_registry::SchemaRegistry;
use restate_bifrost::Bifrost;
use restate_core::network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use restate_core::TaskCenter;
use restate_network::protobuf::node_svc::node_svc_client::NodeSvcClient;
use tonic::transport::Channel;

#[derive(Clone, derive_builder::Builder)]
Expand Down
10 changes: 5 additions & 5 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::FlightData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::FlightData;
use axum::body::StreamBody;
use axum::extract::State;
use axum::response::IntoResponse;
Expand All @@ -30,14 +30,14 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use okapi_operation::*;
use restate_network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;

use crate::state::QueryServiceState;
use restate_core::network::protobuf::node_svc::StorageQueryRequest;

use super::error::StorageQueryError;
use crate::state::QueryServiceState;

#[serde_as]
#[derive(Debug, Deserialize, JsonSchema)]
Expand Down
20 changes: 15 additions & 5 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
test-util = ["dep:rand", "tokio/test-util"]
test-util = ["tokio/test-util", "restate-types/test-util"]
options_schema = ["dep:schemars"]

[dependencies]
Expand All @@ -20,34 +20,44 @@ arc-swap = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
dashmap = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
enumset = { workspace = true }
futures = { workspace = true }
hostname = { workspace = true }
http = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true }
metrics = { workspace = true }
rand = { workspace = true, optional = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_with = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["tracing" ] }
tokio-stream = { workspace = true }
tokio = { workspace = true, features = ["tracing"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio-util = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tower = { workspace = true }
tracing = { workspace = true }

[build-dependencies]
tonic-build = { workspace = true }

[dev-dependencies]
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

googletest = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use std::time::Instant;
use tokio::sync::mpsc;
use tracing::instrument;

use restate_core::metadata;
use restate_core::network::NetworkSendError;
use crate::metadata;
use restate_types::net::codec::Targeted;
use restate_types::net::codec::{serialize_message, WireEncode};
use restate_types::net::ProtocolVersion;
Expand All @@ -25,8 +24,9 @@ use restate_types::protobuf::node::Header;
use restate_types::protobuf::node::Message;
use restate_types::GenerationalNodeId;

use crate::metric_definitions::CONNECTION_SEND_DURATION;
use crate::metric_definitions::MESSAGE_SENT;
use super::error::NetworkSendError;
use super::metric_definitions::CONNECTION_SEND_DURATION;
use super::metric_definitions::MESSAGE_SENT;

/// A single streaming connection with a channel to the peer. A connection can be
/// opened by either ends of the connection and has no direction. Any connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@ use std::time::Instant;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use rand::seq::SliceRandom;
use restate_core::network::{Handler, MessageRouter};
use restate_types::net::codec::try_unwrap_binary_message;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tracing::{debug, info, trace, warn, Instrument, Span};

use restate_core::metadata;
use restate_core::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};
use restate_grpc_util::create_grpc_channel_from_advertised_address;
use restate_types::net::AdvertisedAddress;
use restate_types::protobuf::node::message::{self, ConnectionControl};
use restate_types::protobuf::node::{Header, Hello, Message, Welcome};
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};

use super::connection::{Connection, ConnectionSender};
use super::error::{NetworkError, ProtocolError};
use super::grpc_util::create_grpc_channel_from_advertised_address;
use super::handshake::{negotiate_protocol_version, wait_for_hello, wait_for_welcome};
use crate::error::{NetworkError, ProtocolError};
use crate::metric_definitions::{
use super::metric_definitions::{
self, CONNECTION_DROPPED, INCOMING_CONNECTION, MESSAGE_PROCESSING_DURATION, MESSAGE_RECEIVED,
ONGOING_DRAIN, OUTGOING_CONNECTION,
};
use crate::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::protobuf::node_svc::node_svc_client::NodeSvcClient;
use super::{Handler, MessageRouter};
use crate::metadata;
use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};

// todo: make this configurable
const SEND_QUEUE_SIZE: usize = 1;
Expand Down Expand Up @@ -595,13 +595,13 @@ fn on_connection_terminated(inner_manager: &Mutex<ConnectionManagerInner>) {

#[cfg(test)]
mod tests {
use crate::handshake::HANDSHAKE_TIMEOUT;
use crate::network::handshake::HANDSHAKE_TIMEOUT;

use super::*;

use googletest::prelude::*;

use restate_core::TestCoreEnv;
use crate::TestCoreEnv;
use restate_test_util::assert_eq;
use restate_types::net::{
ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION,
Expand All @@ -616,7 +616,7 @@ mod tests {
test_setup
.tc
.run_in_scope("test", None, async {
let metadata = restate_core::metadata();
let metadata = crate::metadata();
let (tx, rx) = mpsc::channel(1);
let connections = ConnectionManager::default();

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {
test_setup
.tc
.run_in_scope("test", None, async {
let metadata = restate_core::metadata();
let metadata = crate::metadata();

let (tx, rx) = mpsc::channel(1);
let my_node_id = metadata.my_node_id();
Expand Down Expand Up @@ -744,7 +744,7 @@ mod tests {
test_setup
.tc
.run_in_scope("test", None, async {
let metadata = restate_core::metadata();
let metadata = crate::metadata();

let (tx, rx) = mpsc::channel(1);
let mut my_node_id = metadata.my_node_id();
Expand Down
Loading

0 comments on commit 8547f77

Please sign in to comment.