Skip to content

Commit

Permalink
[2/n] Introduce node-protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 28, 2024
1 parent 731aed2 commit 1ca8e9f
Show file tree
Hide file tree
Showing 29 changed files with 581 additions and 117 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ restate-meta = { path = "crates/meta" }
restate-meta-rest-model = { path = "crates/meta-rest-model" }
restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-node-protocol = { path = "crates/node-protocol" }
restate-node-services = { path = "crates/node-services" }
restate-pb = { path = "crates/pb" }
restate-queue = { path = "crates/queue" }
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::rest_api::error::MetaApiError;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_meta::{FileMetaReader, MetaReader};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node::UpdateSchemaRequest;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::UpdateSchemaRequest;
use tonic::transport::Channel;
use tracing::debug;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::info;

use restate_core::cancellation_watcher;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_schema_impl::Schemas;

use crate::{rest_api, state, storage_query};
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//

use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_schema_impl::Schemas;
use tonic::transport::Channel;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use okapi_operation::*;
use restate_node_services::node::StorageQueryRequest;
use restate_node_services::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ test-util = []

[dependencies]
restate-types = { workspace = true }
restate-node-protocol = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
async-trait = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
// by the Apache License, Version 2.0.

mod metadata;
mod network_sender;
mod task_center;
mod task_center_types;

pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter};
pub use network_sender::{NetworkSendError, NetworkSender};
pub use task_center::*;
pub use task_center_types::*;
32 changes: 4 additions & 28 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,23 @@ pub use manager::MetadataManager;
use std::sync::{Arc, OnceLock};

use arc_swap::ArcSwapOption;
use enum_map::{Enum, EnumMap};
use strum_macros::EnumIter;
use enum_map::EnumMap;
use tokio::sync::{oneshot, watch};

use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};
use restate_node_protocol::{MetadataContainer, MetadataKind};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::{GenerationalNodeId, Version};

use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};

/// The kind of versioned metadata that can be synchronized across nodes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)]
pub enum MetadataKind {
NodesConfiguration,
Schema,
PartitionTable,
Logs,
}

#[derive(Clone)]
pub struct Metadata {
sender: manager::CommandSender,
inner: Arc<MetadataInner>,
}

pub enum MetadataContainer {
NodesConfiguration(NodesConfiguration),
}

impl MetadataContainer {
pub fn kind(&self) -> MetadataKind {
match self {
MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration,
}
}
}

impl From<NodesConfiguration> for MetadataContainer {
fn from(value: NodesConfiguration) -> Self {
MetadataContainer::NodesConfiguration(value)
}
}

impl Metadata {
fn new(inner: Arc<MetadataInner>, sender: manager::CommandSender) -> Self {
Self { inner, sender }
Expand Down
34 changes: 34 additions & 0 deletions crates/core/src/network_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use async_trait::async_trait;
use restate_node_protocol::NetworkMessage;
use restate_types::nodes_config::NodesConfigError;
use restate_types::NodeId;

use crate::ShutdownError;

#[derive(Debug, thiserror::Error)]
pub enum NetworkSendError {
#[error("Unknown node: {0}")]
UnknownNode(#[from] NodesConfigError),
#[error("Operation aborted, node is shutting down")]
Shutdown(#[from] ShutdownError),
#[error("OldPeerGeneration: {0}")]
OldPeerGeneration(String),
#[error("Cannot send messages to this node: {0}")]
Unavailable(String),
}

/// Access to node-to-node networking infrastructure
#[async_trait]
pub trait NetworkSender: Send + Sync {
async fn send(&self, to: NodeId, message: &NetworkMessage) -> Result<(), NetworkSendError>;
}
28 changes: 28 additions & 0 deletions crates/node-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "restate-node-protocol"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[features]
default = []

[dependencies]
restate-types = { workspace = true, features = ["serde"] }

anyhow = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
enum-map = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true}
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true, optional = true }

[build-dependencies]
prost-build = { workspace = true }

37 changes: 37 additions & 0 deletions crates/node-protocol/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::env;
use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());

prost_build::Config::new()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("common_descriptor.bin"))
// allow older protobuf compiler to be used
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["./proto/common.proto"], &["proto"])?;

prost_build::Config::new()
.enum_attribute(
"MessageKind",
"#[derive(::enum_map::Enum, ::strum_macros::EnumIs)]",
)
.enum_attribute("Message.body", "#[derive(::strum_macros::EnumIs)]")
.bytes(["."])
.file_descriptor_set_path(out_dir.join("node_descriptor.bin"))
// allow older protobuf compiler to be used
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["./proto/node.proto"], &["proto"])?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ package dev.restate.common;

enum ProtocolVersion {
ProtocolVersion_UNKNOWN = 0;
BASIC_HANDSHAKE = 1;
MAX_PROTOCOL_VERSION = 2;
BINCODED = 1;
}

message NodeId {
Expand Down
72 changes: 72 additions & 0 deletions crates/node-protocol/proto/node.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "common.proto";

package dev.restate.node;

//
// # Wire Protocol Of Streaming Connections
// -------------------------------------
//
message Header { dev.restate.common.Version my_nodes_config_version = 1; }

// First message sent to an ingress after starting the connection. The message
// must be sent before any other message.
message Hello {
dev.restate.common.ProtocolVersion min_protocol_version = 1;
dev.restate.common.ProtocolVersion max_protocol_version = 2;
// generational node id of sender (who am I)
dev.restate.common.NodeId my_node_id = 3;
string cluster_name = 4;
}

message Welcome {
dev.restate.common.ProtocolVersion protocol_version = 2;
// generational node id of sender
dev.restate.common.NodeId my_node_id = 3;
}

enum MessageKind {
MessageKind_UNKNOWN = 0;
GET_METADATA_REQUEST = 1;
METADATA_UPDATE = 2;
}

// Bidirectional Communication
message Message {
enum Signal {
Signal_UNKNOWN = 0;
SHUTDOWN = 1;
// Connection will be dropped
DRAIN_CONNECTION = 2;
CODEC_ERROR = 3;
}
message ConnectionControl {
Signal signal = 1;
string message = 2;
}

message BinaryMessage {
MessageKind kind = 1;
bytes payload = 2;
}

Header header = 1;
oneof body {
ConnectionControl connection_control = 2;
// Sent as first message
Hello hello = 3;
// Sent as first response
Welcome welcome = 4;
BinaryMessage bincoded = 5;
}
}
Loading

0 comments on commit 1ca8e9f

Please sign in to comment.