Skip to content

Commit

Permalink
[6/n] Ingress receives responses over network
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 29, 2024
1 parent 4217f66 commit 4fa7145
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
10 changes: 4 additions & 6 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use restate_types::{GenerationalNodeId, Version};

use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};

/// The kind of versioned metadata that can be synchronized across nodes.

#[derive(Clone)]
pub struct Metadata {
sender: manager::CommandSender,
Expand Down Expand Up @@ -59,7 +57,7 @@ impl Metadata {
}
}

// Returns when the metadata kind is at the provided version (or newer)
/// Returns when the metadata kind is at the provided version (or newer)
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand All @@ -73,7 +71,7 @@ impl Metadata {
Ok(*v)
}

// Watch for version updates of this metadata kind.
/// Watch for version updates of this metadata kind.
pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver<Version> {
self.inner.write_watches[metadata_kind].receive.clone()
}
Expand Down Expand Up @@ -103,7 +101,7 @@ impl MetadataWriter {
Self { sender, inner }
}

// Returns when the nodes configuration update is performed.
/// Returns when the nodes configuration update is performed.
pub async fn update(&self, value: impl Into<MetadataContainer>) -> Result<(), ShutdownError> {
let (callback, recv) = oneshot::channel();
let o = self.sender.send(manager::Command::UpdateMetadata(
Expand All @@ -123,7 +121,7 @@ impl MetadataWriter {
let _ = self.inner.my_node_id.set(id);
}

// Fire and forget update
/// Fire and forget update
pub fn submit(&self, value: impl Into<MetadataContainer>) {
// Ignore the error, task-center takes care of safely shutting down the
// system if metadata manager failed
Expand Down
40 changes: 23 additions & 17 deletions crates/network/src/v2/message_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,26 @@ impl MessageRouter {

#[track_caller]
pub fn set_route(&self, kind: MessageKind, handler: mpsc::Sender<MessageEnvelope>) {
if self.handlers[kind].set(handler).is_err() {
panic!("Handler already set for message kind: {:?}", kind);
}
self.set_handlers(&[kind], handler);
}

#[track_caller]
pub fn set_metadata_manager_subscriber(&self, subscriber: mpsc::Sender<MessageEnvelope>) {
/// Allows metadata manager to receive metadata-related requests and responses
/// Must be called at most once on startup.
pub fn set_metadata_manager_handler(&self, handler: mpsc::Sender<MessageEnvelope>) {
// metadata manager can handle the sync metadata messages.
let interested_in = [MessageKind::GetMetadataRequest, MessageKind::MetadataUpdate];

for kind in interested_in {
if self.handlers[kind].set(subscriber.clone()).is_err() {
panic!("Handler already set for message kind: {}", kind);
}
}
self.set_handlers(&interested_in, handler);
}

#[track_caller]
pub fn set_ingress_subscriber(&self, _handler: mpsc::Sender<MessageEnvelope>) {
// worker can handle the following message types.
// if self.handlers[kind].set(handler).is_err() {
// panic!("Handler already set for message kind: {:?}", kind);
// }
/// Allows metadata manager to receive metadata-related requests and responses
/// Must be called at most once on startup.
pub fn set_ingress_handler(&self, handler: mpsc::Sender<MessageEnvelope>) {
// ingress can handle the following message types.
let interested_in = [
MessageKind::IngressInvocationResponse,
MessageKind::IngressMessageAck,
];
self.set_handlers(&interested_in, handler);
}

pub async fn route_message(&self, envelope: MessageEnvelope) {
Expand All @@ -69,6 +66,15 @@ impl MessageRouter {
error!("No handler set for message kind: {}", envelope.kind());
}
}

#[track_caller]
fn set_handlers(&self, kinds: &[MessageKind], handler: mpsc::Sender<MessageEnvelope>) {
for kind in kinds {
if self.handlers[*kind].set(handler.clone()).is_err() {
panic!("Handler is already set for message kind: {}", kind);
}
}
}
}

static_assertions::assert_impl_all!(MessageRouter: Send, Sync);
8 changes: 4 additions & 4 deletions crates/network/src/v2/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ pub struct Networking {
impl Networking {
#[track_caller]
/// must be called at most once on startup
pub fn set_metadata_manager_subscriber(&self, subscriber: mpsc::Sender<MessageEnvelope>) {
pub fn set_metadata_manager_handler(&self, handler: mpsc::Sender<MessageEnvelope>) {
self.connections
.router()
.set_metadata_manager_subscriber(subscriber)
.set_metadata_manager_handler(handler)
}

#[track_caller]
/// must be called at most once on startup
pub fn set_ingress_subscriber(&self, subscriber: mpsc::Sender<MessageEnvelope>) {
self.connections.router().set_ingress_subscriber(subscriber)
pub fn set_ingress_handler(&self, handler: mpsc::Sender<MessageEnvelope>) {
self.connections.router().set_ingress_handler(handler)
}

pub fn connection_manager(&self) -> ConnectionManager {
Expand Down
3 changes: 3 additions & 0 deletions crates/node-protocol/proto/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ enum MessageKind {
MessageKind_UNKNOWN = 0;
GET_METADATA_REQUEST = 1;
METADATA_UPDATE = 2;
// Ingress
INGRESS_INVOCATION_RESPONSE = 3;
INGRESS_MESSAGE_ACK = 4;
}

// Bidirectional Communication
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Node {
let networking = Arc::new(Networking::default());
let metadata_manager = MetadataManager::build(networking.clone());
// Metadata manager subscribes to sync and update metadata messages.
networking.set_metadata_manager_subscriber(metadata_manager.network_inbound_sender());
networking.set_metadata_manager_handler(metadata_manager.network_inbound_sender());

let admin_role = if options.roles.contains(Role::Admin) {
Some(AdminRole::new(options.clone(), networking.clone())?)
Expand Down

0 comments on commit 4fa7145

Please sign in to comment.