From 1ca8e9f3f40d49345bd6d8a2a194d810ce627a87 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 28 Feb 2024 17:24:59 +0000 Subject: [PATCH] [2/n] Introduce node-protocol --- Cargo.lock | 21 ++++ Cargo.toml | 1 + crates/admin/src/rest_api/mod.rs | 4 +- crates/admin/src/service.rs | 2 +- crates/admin/src/state.rs | 2 +- crates/admin/src/storage_query/query.rs | 2 +- crates/core/Cargo.toml | 2 + crates/core/src/lib.rs | 2 + crates/core/src/metadata/mod.rs | 32 +---- crates/core/src/network_sender.rs | 34 ++++++ crates/node-protocol/Cargo.toml | 28 +++++ crates/node-protocol/build.rs | 37 ++++++ .../proto/common.proto | 3 +- crates/node-protocol/proto/node.proto | 72 +++++++++++ crates/node-protocol/src/common.rs | 71 +++++++++++ crates/node-protocol/src/lib.rs | 27 +++++ crates/node-protocol/src/metadata.rs | 65 ++++++++++ crates/node-protocol/src/node.rs | 114 ++++++++++++++++++ crates/node-protocol/src/protocol.rs | 50 ++++++++ crates/node-protocol/src/status.rs | 16 +++ crates/node-services/Cargo.toml | 3 +- crates/node-services/build.rs | 20 +-- crates/node-services/proto/node_svc.proto | 11 +- crates/node-services/src/lib.rs | 58 +-------- crates/node/Cargo.toml | 1 + .../node/src/network_server/handler/node.rs | 9 +- crates/node/src/network_server/service.rs | 5 +- crates/node/src/roles/admin.rs | 4 +- tools/xtask/src/main.rs | 2 +- 29 files changed, 581 insertions(+), 117 deletions(-) create mode 100644 crates/core/src/network_sender.rs create mode 100644 crates/node-protocol/Cargo.toml create mode 100644 crates/node-protocol/build.rs rename crates/{node-services => node-protocol}/proto/common.proto (92%) create mode 100644 crates/node-protocol/proto/node.proto create mode 100644 crates/node-protocol/src/common.rs create mode 100644 crates/node-protocol/src/lib.rs create mode 100644 crates/node-protocol/src/metadata.rs create mode 100644 crates/node-protocol/src/node.rs create mode 100644 crates/node-protocol/src/protocol.rs create mode 100644 crates/node-protocol/src/status.rs diff --git a/Cargo.lock b/Cargo.lock index 5a0ca87c7..5655bfc62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4930,10 +4930,12 @@ version = "0.8.0" dependencies = [ "anyhow", "arc-swap", + "async-trait", "derive_more", "enum-map", "futures", "googletest", + "restate-node-protocol", "restate-test-util", "restate-types", "static_assertions", @@ -5295,6 +5297,7 @@ dependencies = [ "restate-errors", "restate-meta", "restate-network", + "restate-node-protocol", "restate-node-services", "restate-schema-api", "restate-schema-impl", @@ -5324,6 +5327,23 @@ dependencies = [ "tracing-test", ] +[[package]] +name = "restate-node-protocol" +version = "0.8.0" +dependencies = [ + "anyhow", + "bytes", + "enum-map", + "prost 0.12.3", + "prost-build", + "prost-types", + "restate-types", + "serde", + "strum 0.26.1", + "strum_macros 0.26.1", + "thiserror", +] + [[package]] name = "restate-node-services" version = "0.8.0" @@ -5332,6 +5352,7 @@ dependencies = [ "bytes", "prost 0.12.3", "prost-types", + "restate-node-protocol", "restate-types", "thiserror", "tonic 0.10.2", diff --git a/Cargo.toml b/Cargo.toml index 95dde7e1e..0a2f55866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ restate-meta = { path = "crates/meta" } restate-meta-rest-model = { path = "crates/meta-rest-model" } restate-network = { path = "crates/network" } restate-node = { path = "crates/node" } +restate-node-protocol = { path = "crates/node-protocol" } restate-node-services = { path = "crates/node-services" } restate-pb = { path = "crates/pb" } restate-queue = { path = "crates/queue" } diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index c86af46df..db2b9dc4e 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -22,8 +22,8 @@ use crate::rest_api::error::MetaApiError; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; use restate_meta::{FileMetaReader, MetaReader}; -use restate_node_services::node::node_svc_client::NodeSvcClient; -use restate_node_services::node::UpdateSchemaRequest; +use restate_node_services::node_svc::node_svc_client::NodeSvcClient; +use restate_node_services::node_svc::UpdateSchemaRequest; use tonic::transport::Channel; use tracing::debug; diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 7bea44876..a729b102d 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -18,7 +18,7 @@ use tracing::info; use restate_core::cancellation_watcher; use restate_meta::{FileMetaReader, MetaHandle}; -use restate_node_services::node::node_svc_client::NodeSvcClient; +use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_schema_impl::Schemas; use crate::{rest_api, state, storage_query}; diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 6d12fb6d2..ad2ccbdb7 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -10,7 +10,7 @@ // use restate_meta::{FileMetaReader, MetaHandle}; -use restate_node_services::node::node_svc_client::NodeSvcClient; +use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_schema_impl::Schemas; use tonic::transport::Channel; diff --git a/crates/admin/src/storage_query/query.rs b/crates/admin/src/storage_query/query.rs index 5aea849cf..0bbaafda2 100644 --- a/crates/admin/src/storage_query/query.rs +++ b/crates/admin/src/storage_query/query.rs @@ -30,7 +30,7 @@ use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::record_batch::RecordBatch; use futures::{ready, Stream, StreamExt, TryStreamExt}; use okapi_operation::*; -use restate_node_services::node::StorageQueryRequest; +use restate_node_services::node_svc::StorageQueryRequest; use schemars::JsonSchema; use serde::Deserialize; use serde_with::serde_as; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2774f51c1..1494a868f 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,9 +12,11 @@ test-util = [] [dependencies] restate-types = { workspace = true } +restate-node-protocol = { workspace = true } anyhow = { workspace = true } arc-swap = { workspace = true } +async-trait = { workspace = true } derive_more = { workspace = true } enum-map = { workspace = true } futures = { workspace = true } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 40d89af8b..66bb2f9b7 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -9,9 +9,11 @@ // by the Apache License, Version 2.0. mod metadata; +mod network_sender; mod task_center; mod task_center_types; pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter}; +pub use network_sender::{NetworkSendError, NetworkSender}; pub use task_center::*; pub use task_center_types::*; diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index e67dc20b2..0f07ffc19 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -17,22 +17,16 @@ pub use manager::MetadataManager; use std::sync::{Arc, OnceLock}; use arc_swap::ArcSwapOption; -use enum_map::{Enum, EnumMap}; -use strum_macros::EnumIter; +use enum_map::EnumMap; use tokio::sync::{oneshot, watch}; -use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; +use restate_node_protocol::{MetadataContainer, MetadataKind}; use restate_types::nodes_config::NodesConfiguration; use restate_types::{GenerationalNodeId, Version}; +use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; + /// The kind of versioned metadata that can be synchronized across nodes. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)] -pub enum MetadataKind { - NodesConfiguration, - Schema, - PartitionTable, - Logs, -} #[derive(Clone)] pub struct Metadata { @@ -40,24 +34,6 @@ pub struct Metadata { inner: Arc, } -pub enum MetadataContainer { - NodesConfiguration(NodesConfiguration), -} - -impl MetadataContainer { - pub fn kind(&self) -> MetadataKind { - match self { - MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration, - } - } -} - -impl From for MetadataContainer { - fn from(value: NodesConfiguration) -> Self { - MetadataContainer::NodesConfiguration(value) - } -} - impl Metadata { fn new(inner: Arc, sender: manager::CommandSender) -> Self { Self { inner, sender } diff --git a/crates/core/src/network_sender.rs b/crates/core/src/network_sender.rs new file mode 100644 index 000000000..2b50b1b0f --- /dev/null +++ b/crates/core/src/network_sender.rs @@ -0,0 +1,34 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use async_trait::async_trait; +use restate_node_protocol::NetworkMessage; +use restate_types::nodes_config::NodesConfigError; +use restate_types::NodeId; + +use crate::ShutdownError; + +#[derive(Debug, thiserror::Error)] +pub enum NetworkSendError { + #[error("Unknown node: {0}")] + UnknownNode(#[from] NodesConfigError), + #[error("Operation aborted, node is shutting down")] + Shutdown(#[from] ShutdownError), + #[error("OldPeerGeneration: {0}")] + OldPeerGeneration(String), + #[error("Cannot send messages to this node: {0}")] + Unavailable(String), +} + +/// Access to node-to-node networking infrastructure +#[async_trait] +pub trait NetworkSender: Send + Sync { + async fn send(&self, to: NodeId, message: &NetworkMessage) -> Result<(), NetworkSendError>; +} diff --git a/crates/node-protocol/Cargo.toml b/crates/node-protocol/Cargo.toml new file mode 100644 index 000000000..ad9bc7ca8 --- /dev/null +++ b/crates/node-protocol/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "restate-node-protocol" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[features] +default = [] + +[dependencies] +restate-types = { workspace = true, features = ["serde"] } + +anyhow = { workspace = true, optional = true } +bytes = { workspace = true, optional = true } +enum-map = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } +serde = { workspace = true} +strum = { workspace = true } +strum_macros = { workspace = true } +thiserror = { workspace = true, optional = true } + +[build-dependencies] +prost-build = { workspace = true } + diff --git a/crates/node-protocol/build.rs b/crates/node-protocol/build.rs new file mode 100644 index 000000000..e90609cbe --- /dev/null +++ b/crates/node-protocol/build.rs @@ -0,0 +1,37 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::env; +use std::path::PathBuf; + +fn main() -> Result<(), Box> { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + prost_build::Config::new() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("common_descriptor.bin")) + // allow older protobuf compiler to be used + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["./proto/common.proto"], &["proto"])?; + + prost_build::Config::new() + .enum_attribute( + "MessageKind", + "#[derive(::enum_map::Enum, ::strum_macros::EnumIs)]", + ) + .enum_attribute("Message.body", "#[derive(::strum_macros::EnumIs)]") + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("node_descriptor.bin")) + // allow older protobuf compiler to be used + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["./proto/node.proto"], &["proto"])?; + + Ok(()) +} diff --git a/crates/node-services/proto/common.proto b/crates/node-protocol/proto/common.proto similarity index 92% rename from crates/node-services/proto/common.proto rename to crates/node-protocol/proto/common.proto index 8fd8bd8ea..f53278883 100644 --- a/crates/node-services/proto/common.proto +++ b/crates/node-protocol/proto/common.proto @@ -13,8 +13,7 @@ package dev.restate.common; enum ProtocolVersion { ProtocolVersion_UNKNOWN = 0; - BASIC_HANDSHAKE = 1; - MAX_PROTOCOL_VERSION = 2; + BINCODED = 1; } message NodeId { diff --git a/crates/node-protocol/proto/node.proto b/crates/node-protocol/proto/node.proto new file mode 100644 index 000000000..9c506f3c7 --- /dev/null +++ b/crates/node-protocol/proto/node.proto @@ -0,0 +1,72 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "common.proto"; + +package dev.restate.node; + +// +// # Wire Protocol Of Streaming Connections +// ------------------------------------- +// +message Header { dev.restate.common.Version my_nodes_config_version = 1; } + +// First message sent to an ingress after starting the connection. The message +// must be sent before any other message. +message Hello { + dev.restate.common.ProtocolVersion min_protocol_version = 1; + dev.restate.common.ProtocolVersion max_protocol_version = 2; + // generational node id of sender (who am I) + dev.restate.common.NodeId my_node_id = 3; + string cluster_name = 4; +} + +message Welcome { + dev.restate.common.ProtocolVersion protocol_version = 2; + // generational node id of sender + dev.restate.common.NodeId my_node_id = 3; +} + +enum MessageKind { + MessageKind_UNKNOWN = 0; + GET_METADATA_REQUEST = 1; + METADATA_UPDATE = 2; +} + +// Bidirectional Communication +message Message { + enum Signal { + Signal_UNKNOWN = 0; + SHUTDOWN = 1; + // Connection will be dropped + DRAIN_CONNECTION = 2; + CODEC_ERROR = 3; + } + message ConnectionControl { + Signal signal = 1; + string message = 2; + } + + message BinaryMessage { + MessageKind kind = 1; + bytes payload = 2; + } + + Header header = 1; + oneof body { + ConnectionControl connection_control = 2; + // Sent as first message + Hello hello = 3; + // Sent as first response + Welcome welcome = 4; + BinaryMessage bincoded = 5; + } +} diff --git a/crates/node-protocol/src/common.rs b/crates/node-protocol/src/common.rs new file mode 100644 index 000000000..811f068f3 --- /dev/null +++ b/crates/node-protocol/src/common.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +include!(concat!(env!("OUT_DIR"), "/dev.restate.common.rs")); + +pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Bincoded; +pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Bincoded; + +pub const FILE_DESCRIPTOR_SET: &[u8] = + include_bytes!(concat!(env!("OUT_DIR"), "/common_descriptor.bin")); + +impl ProtocolVersion { + pub fn is_supported(&self) -> bool { + *self >= MIN_SUPPORTED_PROTOCOL_VERSION && *self <= CURRENT_PROTOCOL_VERSION + } +} + +impl From for restate_types::Version { + fn from(version: Version) -> Self { + restate_types::Version::from(version.value) + } +} + +impl From for Version { + fn from(version: restate_types::Version) -> Self { + Version { + value: version.into(), + } + } +} + +impl From for restate_types::NodeId { + fn from(node_id: NodeId) -> Self { + restate_types::NodeId::new(node_id.id, node_id.generation) + } +} + +impl From for NodeId { + fn from(node_id: restate_types::NodeId) -> Self { + NodeId { + id: node_id.id().into(), + generation: node_id.as_generational().map(|g| g.generation()), + } + } +} + +impl From for NodeId { + fn from(node_id: restate_types::PlainNodeId) -> Self { + let id: u32 = node_id.into(); + NodeId { + id, + generation: None, + } + } +} + +impl From for NodeId { + fn from(node_id: restate_types::GenerationalNodeId) -> Self { + NodeId { + id: node_id.raw_id(), + generation: Some(node_id.generation()), + } + } +} diff --git a/crates/node-protocol/src/lib.rs b/crates/node-protocol/src/lib.rs new file mode 100644 index 000000000..27bc432db --- /dev/null +++ b/crates/node-protocol/src/lib.rs @@ -0,0 +1,27 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod common; +mod metadata; +pub mod node; +mod protocol; + +pub use common::CURRENT_PROTOCOL_VERSION; +pub use common::MIN_SUPPORTED_PROTOCOL_VERSION; + +pub use metadata::*; +pub use node::MessageKind; +pub use protocol::{MessageEnvelope, NetworkMessage}; + +// re-exports of network message types +pub mod messages { + pub use crate::metadata::GetMetadataRequest; + pub use crate::metadata::MetadataUpdate; +} diff --git a/crates/node-protocol/src/metadata.rs b/crates/node-protocol/src/metadata.rs new file mode 100644 index 000000000..63f859d7b --- /dev/null +++ b/crates/node-protocol/src/metadata.rs @@ -0,0 +1,65 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use enum_map::Enum; +use restate_types::nodes_config::NodesConfiguration; +use serde::{Deserialize, Serialize}; +use strum_macros::EnumIter; + +/// The kind of versioned metadata that can be synchronized across nodes. +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Hash, + Enum, + EnumIter, + Serialize, + Deserialize, + strum_macros::Display, +)] +pub enum MetadataKind { + NodesConfiguration, + Schema, + PartitionTable, + Logs, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MetadataContainer { + NodesConfiguration(NodesConfiguration), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetMetadataRequest { + pub metadata_kind: MetadataKind, + pub min_version: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetadataUpdate { + pub container: MetadataContainer, +} + +impl MetadataContainer { + pub fn kind(&self) -> MetadataKind { + match self { + MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration, + } + } +} + +impl From for MetadataContainer { + fn from(value: NodesConfiguration) -> Self { + MetadataContainer::NodesConfiguration(value) + } +} diff --git a/crates/node-protocol/src/node.rs b/crates/node-protocol/src/node.rs new file mode 100644 index 000000000..e46a5657e --- /dev/null +++ b/crates/node-protocol/src/node.rs @@ -0,0 +1,114 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::GenerationalNodeId; + +use crate::common::{ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION}; + +use self::message::{BinaryMessage, ConnectionControl, Signal}; + +include!(concat!(env!("OUT_DIR"), "/dev.restate.node.rs")); + +pub const FILE_DESCRIPTOR_SET: &[u8] = + include_bytes!(concat!(env!("OUT_DIR"), "/node_descriptor.bin")); + +impl Hello { + pub fn new(my_node_id: GenerationalNodeId, cluster_name: String) -> Self { + Self { + min_protocol_version: MIN_SUPPORTED_PROTOCOL_VERSION.into(), + max_protocol_version: CURRENT_PROTOCOL_VERSION.into(), + my_node_id: Some(my_node_id.into()), + cluster_name, + } + } +} + +impl Header { + pub fn new(nodes_config_version: restate_types::Version) -> Self { + Self { + my_nodes_config_version: Some(nodes_config_version.into()), + } + } +} + +impl Welcome { + pub fn new(my_node_id: GenerationalNodeId, protocol_version: ProtocolVersion) -> Self { + Self { + my_node_id: Some(my_node_id.into()), + protocol_version: protocol_version.into(), + } + } +} + +impl Message { + pub fn new(header: Header, body: impl Into) -> Self { + Self { + header: Some(header), + body: Some(body.into()), + } + } +} + +impl From for message::Body { + fn from(value: Hello) -> Self { + message::Body::Hello(value) + } +} + +impl From for message::Body { + fn from(value: Welcome) -> Self { + message::Body::Welcome(value) + } +} + +impl From for message::Body { + fn from(value: ConnectionControl) -> Self { + message::Body::ConnectionControl(value) + } +} + +impl From for message::Body { + fn from(value: BinaryMessage) -> Self { + message::Body::Bincoded(value) + } +} + +impl ConnectionControl { + pub fn connection_reset() -> Self { + Self { + signal: message::Signal::DrainConnection.into(), + message: "Connection is draining and will be dropped".to_owned(), + } + } + pub fn shutdown() -> Self { + Self { + signal: message::Signal::Shutdown.into(), + message: "Node is shutting down".to_owned(), + } + } + pub fn codec_error(message: impl Into) -> Self { + Self { + signal: message::Signal::CodecError.into(), + message: message.into(), + } + } +} + +impl std::fmt::Display for MessageKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str_name()) + } +} + +impl std::fmt::Display for Signal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str_name()) + } +} diff --git a/crates/node-protocol/src/protocol.rs b/crates/node-protocol/src/protocol.rs new file mode 100644 index 000000000..28188387a --- /dev/null +++ b/crates/node-protocol/src/protocol.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::GenerationalNodeId; +use serde::{Deserialize, Serialize}; + +use crate::messages::{GetMetadataRequest, MetadataUpdate}; +use crate::MessageKind; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum NetworkMessage { + GetMetadataRequest(GetMetadataRequest), + MetadataUpdate(MetadataUpdate), +} + +impl NetworkMessage { + pub fn kind(&self) -> MessageKind { + match self { + Self::GetMetadataRequest(_) => MessageKind::GetMetadataRequest, + Self::MetadataUpdate(_) => MessageKind::MetadataUpdate, + } + } +} + +// routing table for incoming message types to network components +pub struct MessageEnvelope { + pub from: GenerationalNodeId, + pub msg: NetworkMessage, +} + +impl MessageEnvelope { + pub fn new(from: GenerationalNodeId, msg: NetworkMessage) -> Self { + Self { from, msg } + } + + pub fn kind(&self) -> MessageKind { + self.msg.kind() + } + + pub fn split(self) -> (GenerationalNodeId, NetworkMessage) { + (self.from, self.msg) + } +} diff --git a/crates/node-protocol/src/status.rs b/crates/node-protocol/src/status.rs new file mode 100644 index 000000000..a78c5ed9c --- /dev/null +++ b/crates/node-protocol/src/status.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum Status { + VersionNotFound, +} diff --git a/crates/node-services/Cargo.toml b/crates/node-services/Cargo.toml index 2530b7ea9..64b7e4b02 100644 --- a/crates/node-services/Cargo.toml +++ b/crates/node-services/Cargo.toml @@ -11,6 +11,7 @@ publish = false default = [] [dependencies] +restate-node-protocol = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true, optional = true } @@ -18,7 +19,7 @@ bytes = { workspace = true, optional = true } prost = { workspace = true } prost-types = { workspace = true } thiserror = { workspace = true, optional = true } -tonic = { workspace = true } +tonic = { workspace = true, features = ["transport", "codegen", "prost"] } [build-dependencies] tonic-build = { workspace = true } diff --git a/crates/node-services/build.rs b/crates/node-services/build.rs index 0f75bb318..d4823fb87 100644 --- a/crates/node-services/build.rs +++ b/crates/node-services/build.rs @@ -14,26 +14,28 @@ use std::path::PathBuf; fn main() -> Result<(), Box> { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - tonic_build::configure() - .bytes(["."]) - .file_descriptor_set_path(out_dir.join("common_descriptor.bin")) - // allow older protobuf compiler to be used - .protoc_arg("--experimental_allow_proto3_optional") - .compile(&["./proto/common.proto"], &["proto"])?; - tonic_build::configure() .bytes(["."]) .file_descriptor_set_path(out_dir.join("cluster_ctrl_svc_descriptor.bin")) // allow older protobuf compiler to be used .protoc_arg("--experimental_allow_proto3_optional") - .compile(&["./proto/cluster_ctrl_svc.proto"], &["proto"])?; + .extern_path(".dev.restate.common", "::restate_node_protocol::common") + .compile( + &["./proto/cluster_ctrl_svc.proto"], + &["proto", "../node-protocol/proto"], + )?; tonic_build::configure() .bytes(["."]) .file_descriptor_set_path(out_dir.join("node_svc_descriptor.bin")) // allow older protobuf compiler to be used .protoc_arg("--experimental_allow_proto3_optional") - .compile(&["./proto/node_svc.proto"], &["proto"])?; + .extern_path(".dev.restate.node", "::restate_node_protocol::node") + .extern_path(".dev.restate.common", "::restate_node_protocol::common") + .compile( + &["./proto/node_svc.proto"], + &["proto", "../node-protocol/proto"], + )?; Ok(()) } diff --git a/crates/node-services/proto/node_svc.proto b/crates/node-services/proto/node_svc.proto index 8ebee5737..0bb1c59be 100644 --- a/crates/node-services/proto/node_svc.proto +++ b/crates/node-services/proto/node_svc.proto @@ -11,8 +11,9 @@ syntax = "proto3"; import "google/protobuf/empty.proto"; import "common.proto"; +import "node.proto"; -package dev.restate.node; +package dev.restate.node_svc; service NodeSvc { // Get identity information from this node. @@ -56,15 +57,11 @@ message StateMutationRequest { bytes state_mutation = 1; } -message StorageQueryRequest { - string query = 1; -} +message StorageQueryRequest { string query = 1; } message StorageQueryResponse { bytes header = 1; bytes data = 2; } -message UpdateSchemaRequest { - bytes schema_bin = 1; -} +message UpdateSchemaRequest { bytes schema_bin = 1; } diff --git a/crates/node-services/src/lib.rs b/crates/node-services/src/lib.rs index dd1484900..6a36377ae 100644 --- a/crates/node-services/src/lib.rs +++ b/crates/node-services/src/lib.rs @@ -15,63 +15,9 @@ pub mod cluster_ctrl { tonic::include_file_descriptor_set!("cluster_ctrl_svc_descriptor"); } -pub mod node { - tonic::include_proto!("dev.restate.node"); +pub mod node_svc { + tonic::include_proto!("dev.restate.node_svc"); pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("node_svc_descriptor"); } - -pub mod common { - tonic::include_proto!("dev.restate.common"); - - pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("common_descriptor"); - - impl From for restate_types::Version { - fn from(version: Version) -> Self { - restate_types::Version::from(version.value) - } - } - - impl From for Version { - fn from(version: restate_types::Version) -> Self { - Version { - value: version.into(), - } - } - } - - impl From for restate_types::NodeId { - fn from(node_id: NodeId) -> Self { - restate_types::NodeId::new(node_id.id, node_id.generation) - } - } - - impl From for NodeId { - fn from(node_id: restate_types::NodeId) -> Self { - NodeId { - id: node_id.id().into(), - generation: node_id.as_generational().map(|g| g.generation()), - } - } - } - - impl From for NodeId { - fn from(node_id: restate_types::PlainNodeId) -> Self { - let id: u32 = node_id.into(); - NodeId { - id, - generation: None, - } - } - } - - impl From for NodeId { - fn from(node_id: restate_types::GenerationalNodeId) -> Self { - NodeId { - id: node_id.raw_id(), - generation: Some(node_id.generation()), - } - } - } -} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 1a3701f89..98e06570e 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -24,6 +24,7 @@ restate-core = { workspace = true } restate-errors = { workspace = true } restate-meta = { workspace = true } restate-network = { workspace = true } +restate-node-protocol = { workspace = true } restate-node-services = { workspace = true } restate-schema-api = { workspace = true } restate-schema-impl = { workspace = true } diff --git a/crates/node/src/network_server/handler/node.rs b/crates/node/src/network_server/handler/node.rs index 9bbba1148..579b6003f 100644 --- a/crates/node/src/network_server/handler/node.rs +++ b/crates/node/src/network_server/handler/node.rs @@ -13,10 +13,11 @@ use arrow_flight::error::FlightError; use futures::stream::BoxStream; use futures::TryStreamExt; use restate_core::{metadata, TaskCenter}; -use restate_node_services::node::node_svc_server::NodeSvc; -use restate_node_services::node::{ - IdentResponse, NodeStatus, StateMutationRequest, StorageQueryRequest, StorageQueryResponse, - TerminationRequest, UpdateSchemaRequest, +use restate_node_services::node_svc::node_svc_server::NodeSvc; +use restate_node_services::node_svc::{IdentResponse, NodeStatus}; +use restate_node_services::node_svc::{ + StateMutationRequest, StorageQueryRequest, StorageQueryResponse, TerminationRequest, + UpdateSchemaRequest, }; use restate_schema_impl::SchemasUpdateCommand; use restate_worker_api::Handle; diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index e34d2135a..9c2c4fa41 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -18,9 +18,10 @@ use tracing::info; use restate_cluster_controller::ClusterControllerHandle; use restate_core::{cancellation_watcher, task_center}; use restate_meta::FileMetaReader; +use restate_node_protocol::{common, node}; +use restate_node_services::cluster_ctrl; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvcServer; -use restate_node_services::node::node_svc_server::NodeSvcServer; -use restate_node_services::{cluster_ctrl, common, node}; +use restate_node_services::node_svc::node_svc_server::NodeSvcServer; use restate_schema_impl::Schemas; use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index bf687540d..9e6131393 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -10,7 +10,7 @@ use anyhow::Context; use codederror::CodedError; -use restate_node_services::node::node_svc_client::NodeSvcClient; +use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use tonic::transport::Channel; use tracing::info; @@ -18,7 +18,7 @@ use restate_admin::service::AdminService; use restate_cluster_controller::ClusterControllerHandle; use restate_core::{task_center, TaskKind}; use restate_meta::{FileMetaReader, FileMetaStorage, MetaService}; -use restate_node_services::node::{StateMutationRequest, TerminationRequest}; +use restate_node_services::node_svc::{StateMutationRequest, TerminationRequest}; use restate_types::invocation::InvocationTermination; use restate_types::state_mut::ExternalStateMutation; use restate_worker::KafkaIngressOptions; diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index 685ba09a0..76b4b7f27 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -11,7 +11,7 @@ use anyhow::bail; use reqwest::header::ACCEPT; use restate_core::{create_test_task_center, TaskKind}; -use restate_node_services::node::node_svc_client::NodeSvcClient; +use restate_node_services::node_svc::node_svc_client::NodeSvcClient; use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; use restate_types::invocation::InvocationTermination;