Skip to content

Commit

Permalink
[3/n] Stub networking and network sender
Browse files Browse the repository at this point in the history
This also introduces metadata manager's machinery for sending metadata updates and accepting metadata fetch requests across nodes
  • Loading branch information
AhmedSoliman committed Mar 4, 2024
1 parent 408487a commit 582c601
Show file tree
Hide file tree
Showing 18 changed files with 582 additions and 61 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

Expand Down
175 changes: 160 additions & 15 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Deref;
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::info;
use tracing::{debug, info, warn};

use crate::cancellation_watcher;
use restate_node_protocol::metadata::{MetadataMessage, MetadataUpdate};
use restate_node_protocol::MessageEnvelope;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::GenerationalNodeId;
use restate_types::Version;

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata;
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;

use super::Metadata;
use super::MetadataContainer;
use super::MetadataInner;
use super::MetadataKind;
use super::MetadataWriter;
use super::{Metadata, MetadataContainer, MetadataInner, MetadataKind, MetadataWriter};

pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;
Expand All @@ -30,6 +36,106 @@ pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
}

/// A handler for processing network messages targeting metadata manager
/// (dev.restate.common.TargetName = METADATA_MANAGER)
struct MetadataMessageHandler<N>
where
N: NetworkSender + 'static + Clone,
{
sender: CommandSender,
networking: N,
}

impl<N> MetadataMessageHandler<N>
where
N: NetworkSender + 'static + Clone,
{
fn send_metadata(
&self,
peer: GenerationalNodeId,
metadata_kind: MetadataKind,
min_version: Option<Version>,
) {
match metadata_kind {
MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version),
_ => {
todo!("Can't send metadata '{}' to peer", metadata_kind)
}
};
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = metadata().nodes_config();
if version.is_some_and(|min_version| min_version > config.version()) {
// We don't have the version that the peer is asking for. Just ignore.
info!(
"Peer requested nodes config version {} but we have {}, ignoring their request",
version.unwrap(),
config.version()
);
return;
}
info!(
"Sending nodes config {} to peer, requested version? {:?}",
config.version(),
version,
);
let _ = task_center().spawn_child(
crate::TaskKind::Disposable,
"send-metadata-to-peer",
None,
{
let networking = self.networking.clone();
async move {
networking
.send(
to.into(),
&MetadataMessage::MetadataUpdate(MetadataUpdate {
container: MetadataContainer::NodesConfiguration(
config.deref().clone(),
),
}),
)
.await?;
Ok(())
}
},
);
}
}

impl<N> MessageHandler for MetadataMessageHandler<N>
where
N: NetworkSender + 'static + Clone,
{
type MessageType = MetadataMessage;

async fn on_message(&self, envelope: MessageEnvelope<MetadataMessage>) {
let (peer, msg) = envelope.split();
match msg {
MetadataMessage::MetadataUpdate(update) => {
info!(
"Received '{}' metadata update from peer {}",
update.container.kind(),
peer
);
if let Err(e) = self
.sender
.send(Command::UpdateMetadata(update.container, None))
{
if !is_cancellation_requested() {
warn!("Failed to send metadata message to metadata manager: {}", e);
}
}
}
MetadataMessage::GetMetadataRequest(request) => {
debug!("Received GetMetadataRequest from peer {}", peer);
self.send_metadata(peer, request.metadata_kind, request.min_version);
}
};
}
}

/// Handle to access locally cached metadata, request metadata updates, and more.
/// What is metadata manager?
///
Expand All @@ -49,23 +155,34 @@ pub(super) enum Command {
/// - Schema metadata
/// - NodesConfiguration
/// - Partition table
pub struct MetadataManager {
pub struct MetadataManager<N> {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
inbound: CommandReceiver,
networking: N,
}

impl MetadataManager {
pub fn build() -> Self {
let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel();

impl<N> MetadataManager<N>
where
N: NetworkSender + 'static + Clone,
{
pub fn build(networking: N) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
Self {
inner: Arc::new(MetadataInner::default()),
inbound,
self_sender,
networking,
}
}

pub fn register_in_message_router(&self, sr_builder: &mut MessageRouterBuilder) {
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.self_sender.clone(),
networking: self.networking.clone(),
});
}

pub fn metadata(&self) -> Metadata {
Metadata::new(self.inner.clone(), self.self_sender.clone())
}
Expand Down Expand Up @@ -124,6 +241,11 @@ impl MetadataManager {
}
Some(current) => {
/* Do nothing, current is already newer */
debug!(
"Ignoring nodes config update {} because we are at {}",
config.version(),
current.version(),
);
maybe_new_version = current.version();
}
}
Expand Down Expand Up @@ -156,15 +278,37 @@ mod tests {
use googletest::prelude::*;
use restate_test_util::assert_eq;
use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role};
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use crate::metadata::spawn_metadata_manager;
use crate::network::NetworkSendError;
use crate::TaskCenterFactory;

// TEMPORARY. REMOVED IN NEXT PR(s)
#[derive(Clone)]
struct MockNetworkSender;

impl NetworkSender for MockNetworkSender {
async fn send<M>(
&self,
_to: NodeId,
_message: &M,
) -> std::result::Result<(), NetworkSendError>
where
M: restate_node_protocol::codec::WireSerde
+ restate_node_protocol::codec::Targeted
+ Send
+ Sync,
{
Ok(())
}
}

#[tokio::test]
async fn test_nodes_config_updates() -> Result<()> {
let network_sender = MockNetworkSender;
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down Expand Up @@ -219,8 +363,9 @@ mod tests {

#[tokio::test]
async fn test_watchers() -> Result<()> {
let network_sender = MockNetworkSender;
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down
10 changes: 7 additions & 3 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use restate_node_protocol::metadata::{MetadataContainer, MetadataKind};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::{GenerationalNodeId, Version};

use crate::network::NetworkSender;
use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};

/// The kind of versioned metadata that can be synchronized across nodes.
Expand Down Expand Up @@ -148,10 +149,13 @@ impl Default for VersionWatch {
}
}

pub fn spawn_metadata_manager(
pub fn spawn_metadata_manager<N>(
tc: &TaskCenter,
metadata_manager: MetadataManager,
) -> Result<TaskId, ShutdownError> {
metadata_manager: MetadataManager<N>,
) -> Result<TaskId, ShutdownError>
where
N: NetworkSender + 'static,
{
tc.spawn(
TaskKind::MetadataBackgroundSync,
"metadata-manager",
Expand Down
Loading

0 comments on commit 582c601

Please sign in to comment.