Skip to content

Commit

Permalink
[3/n] Stub networking and network sender
Browse files Browse the repository at this point in the history
This also introduces metadata manager's machinery for sending metadata updates and accepting metadata fetch requests across nodes
  • Loading branch information
AhmedSoliman committed Mar 1, 2024
1 parent b10cb50 commit d539a70
Show file tree
Hide file tree
Showing 14 changed files with 350 additions and 49 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 123 additions & 6 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Deref;
use std::sync::Arc;

use restate_node_protocol::MessageEnvelope;
use restate_node_protocol::MetadataUpdate;
use restate_node_protocol::NetworkMessage;
use restate_types::GenerationalNodeId;
use restate_types::Version;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::debug;
use tracing::info;

use crate::cancellation_watcher;
use crate::network_sender::NetworkSender;
use crate::task_center;
use restate_types::nodes_config::NodesConfiguration;

use super::Metadata;
Expand All @@ -28,6 +37,7 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
SendMetadataToPeer(GenerationalNodeId, MetadataKind, Option<Version>),
}

/// Handle to access locally cached metadata, request metadata updates, and more.
Expand All @@ -53,16 +63,25 @@ pub struct MetadataManager {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
inbound: CommandReceiver,
networking: Arc<dyn NetworkSender>,
// Handle inbound network messages to update our metadata and to respond to
// external metadata fetch requests
network_inbound: mpsc::Receiver<MessageEnvelope>,
network_inbound_sender: mpsc::Sender<MessageEnvelope>,
}

impl MetadataManager {
pub fn build() -> Self {
let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel();
pub fn build(networking: Arc<dyn NetworkSender>) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
let (network_inbound_sender, network_inbound) = mpsc::channel(1);

Self {
inner: Arc::new(MetadataInner::default()),
inbound,
networking,
self_sender,
network_inbound,
network_inbound_sender,
}
}

Expand All @@ -74,6 +93,10 @@ impl MetadataManager {
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
}

pub fn network_inbound_sender(&self) -> mpsc::Sender<MessageEnvelope> {
self.network_inbound_sender.clone()
}

/// Start and wait for shutdown signal.
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Metadata manager started");
Expand All @@ -88,6 +111,9 @@ impl MetadataManager {
Some(cmd) = self.inbound.recv() => {
self.handle_command(cmd)
}
Some(envelope) = self.network_inbound.recv() => {
self.handle_network_message(envelope).await
}
}
}
Ok(())
Expand All @@ -96,9 +122,23 @@ impl MetadataManager {
fn handle_command(&mut self, cmd: Command) {
match cmd {
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SendMetadataToPeer(peer, kind, min_version) => {
self.send_metadata(peer, kind, min_version)
}
}
}

async fn handle_network_message(&mut self, envelope: MessageEnvelope) {
let (peer, msg) = envelope.split();
match msg {
NetworkMessage::MetadataUpdate(update) => self.update_metadata(update.container, None),
NetworkMessage::GetMetadataRequest(request) => {
debug!("Received GetMetadataRequest from peer {}", peer);
self.send_metadata(peer, request.metadata_kind, request.min_version);
}
};
}

fn update_metadata(&mut self, value: MetadataContainer, callback: Option<oneshot::Sender<()>>) {
match value {
MetadataContainer::NodesConfiguration(config) => {
Expand All @@ -107,6 +147,62 @@ impl MetadataManager {
}
}

fn send_metadata(
&mut self,
peer: GenerationalNodeId,
metadata_kind: MetadataKind,
min_version: Option<Version>,
) {
match metadata_kind {
MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version),
_ => {
todo!("Can't send metadata '{}' to peer", metadata_kind)
}
};
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = self.inner.nodes_config.load_full();
let Some(config) = config else {
return;
};
if version.is_some_and(|min_version| min_version > config.version()) {
// We don't have the version that the peer is asking for. Just ignore.
debug!(
"Peer requested nodes config version {} but we have {}, ignoring their request",
version.unwrap(),
config.version()
);
return;
}
info!(
"Sending nodes config {} to peer, requested version? {:?}",
config.version(),
version,
);
let _ = task_center().spawn_child(
crate::TaskKind::Disposable,
"send-metadata-to-peer",
None,
{
let networking = self.networking.clone();
async move {
networking
.send(
to.into(),
&NetworkMessage::MetadataUpdate(MetadataUpdate {
container: MetadataContainer::NodesConfiguration(
config.deref().clone(),
),
}),
)
.await?;
Ok(())
}
},
);
}

fn update_nodes_configuration(
&mut self,
config: NodesConfiguration,
Expand All @@ -124,6 +220,11 @@ impl MetadataManager {
}
Some(current) => {
/* Do nothing, current is already newer */
debug!(
"Ignoring nodes config update {} because we are at {}",
config.version(),
current.version(),
);
maybe_new_version = current.version();
}
}
Expand Down Expand Up @@ -153,18 +254,33 @@ mod tests {

use super::*;

use async_trait::async_trait;
use googletest::prelude::*;
use restate_test_util::assert_eq;
use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role};
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use crate::metadata::spawn_metadata_manager;
use crate::TaskCenterFactory;
use crate::{NetworkSendError, TaskCenterFactory};

struct MockNetworkSender;

#[async_trait]
impl NetworkSender for MockNetworkSender {
async fn send(
&self,
_to: NodeId,
_message: &NetworkMessage,
) -> std::result::Result<(), NetworkSendError> {
Ok(())
}
}

#[tokio::test]
async fn test_nodes_config_updates() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down Expand Up @@ -219,8 +335,9 @@ mod tests {

#[tokio::test]
async fn test_watchers() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down
28 changes: 23 additions & 5 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::FutureExt;
use restate_types::identifiers::PartitionId;
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};

use futures::Future;
use async_trait::async_trait;
use futures::{Future, FutureExt};
use restate_types::NodeId;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use tracing::{debug, error, info, instrument, trace, warn};

use restate_node_protocol::NetworkMessage;
use restate_types::identifiers::PartitionId;

use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager};
use crate::{TaskId, TaskKind};
use crate::{NetworkSendError, NetworkSender, TaskId, TaskKind};

static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0);
const EXIT_CODE_FAILURE: i32 = 1;
Expand All @@ -32,6 +35,20 @@ const EXIT_CODE_FAILURE: i32 = 1;
#[error("system is shutting down")]
pub struct ShutdownError;

// TEMPORARY. REMOVED IN NEXT PR(s)
struct MockNetworkSender;

#[async_trait]
impl NetworkSender for MockNetworkSender {
async fn send(
&self,
_to: NodeId,
_message: &NetworkMessage,
) -> std::result::Result<(), NetworkSendError> {
Ok(())
}
}

/// Used to create a new task center. In practice, there should be a single task center for the
/// entire process but we might need to create more than one in integration test scenarios.
pub struct TaskCenterFactory {}
Expand All @@ -56,7 +73,8 @@ pub fn create_test_task_center() -> TaskCenter {

let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());

let metadata_manager = MetadataManager::build();
let networking = Arc::new(MockNetworkSender);
let metadata_manager = MetadataManager::build(networking);
let metadata = metadata_manager.metadata();
metadata_manager
.writer()
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum TaskKind {
/// A background task that the system needs for its operation. The task requires a system
/// shutdown on errors and the system will wait for its graceful cancellation on shutdown.
BifrostBackgroundHighPriority,
#[strum(props(OnCancel = "abort", OnError = "log"))]
Disposable,
}

impl TaskKind {
Expand Down
3 changes: 3 additions & 0 deletions crates/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ publish = false
[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-node-protocol = { workspace = true }
restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
pin-project = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use std::fmt::Debug;
use std::future::Future;
use tokio::sync::mpsc;

mod v2;
pub use v2::*;

mod routing;
mod unbounded_handle;
pub mod utils;
Expand Down
Loading

0 comments on commit d539a70

Please sign in to comment.