Skip to content

Commit

Permalink
[3/n] Stub networking and network sender
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 28, 2024
1 parent 1ca8e9f commit c18075c
Show file tree
Hide file tree
Showing 15 changed files with 351 additions and 50 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 123 additions & 6 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Deref;
use std::sync::Arc;

use restate_node_protocol::MessageEnvelope;
use restate_node_protocol::MetadataUpdate;
use restate_node_protocol::NetworkMessage;
use restate_types::GenerationalNodeId;
use restate_types::Version;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::debug;
use tracing::info;

use crate::cancellation_watcher;
use crate::network_sender::NetworkSender;
use crate::task_center;
use restate_types::nodes_config::NodesConfiguration;

use super::Metadata;
Expand All @@ -28,6 +37,7 @@ pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
SendMetadataToPeer(GenerationalNodeId, MetadataKind, Option<Version>),
}

/// Handle to access locally cached metadata, request metadata updates, and more.
Expand All @@ -53,16 +63,25 @@ pub struct MetadataManager {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
inbound: CommandReceiver,
networking: Arc<dyn NetworkSender>,
// Handle inbound network messages to update our metadata and to respond to
// external metadata fetch requests
network_inbound: mpsc::Receiver<MessageEnvelope>,
network_inbound_sender: mpsc::Sender<MessageEnvelope>,
}

impl MetadataManager {
pub fn build() -> Self {
let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel();
pub fn build(networking: Arc<dyn NetworkSender>) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
let (network_inbound_sender, network_inbound) = mpsc::channel(1);

Self {
inner: Arc::new(MetadataInner::default()),
inbound,
networking,
self_sender,
network_inbound,
network_inbound_sender,
}
}

Expand All @@ -74,6 +93,10 @@ impl MetadataManager {
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
}

pub fn network_inbound_sender(&self) -> mpsc::Sender<MessageEnvelope> {
self.network_inbound_sender.clone()
}

/// Start and wait for shutdown signal.
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Metadata manager started");
Expand All @@ -88,6 +111,9 @@ impl MetadataManager {
Some(cmd) = self.inbound.recv() => {
self.handle_command(cmd)
}
Some(envelope) = self.network_inbound.recv() => {
self.handle_network_message(envelope).await
}
}
}
Ok(())
Expand All @@ -96,9 +122,23 @@ impl MetadataManager {
fn handle_command(&mut self, cmd: Command) {
match cmd {
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SendMetadataToPeer(peer, kind, min_version) => {
self.send_metadata(peer, kind, min_version)
}
}
}

async fn handle_network_message(&mut self, envelope: MessageEnvelope) {
let (peer, msg) = envelope.split();
match msg {
NetworkMessage::MetadataUpdate(update) => self.update_metadata(update.container, None),
NetworkMessage::GetMetadataRequest(request) => {
debug!("Received GetMetadataRequest from peer {}", peer);
self.send_metadata(peer, request.metadata_kind, request.min_version);
}
};
}

fn update_metadata(&mut self, value: MetadataContainer, callback: Option<oneshot::Sender<()>>) {
match value {
MetadataContainer::NodesConfiguration(config) => {
Expand All @@ -107,6 +147,62 @@ impl MetadataManager {
}
}

fn send_metadata(
&mut self,
peer: GenerationalNodeId,
metadata_kind: MetadataKind,
min_version: Option<Version>,
) {
match metadata_kind {
MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version),
_ => {
todo!("Can't send metadata '{}' to peer", metadata_kind)
}
};
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = self.inner.nodes_config.load_full();
let Some(config) = config else {
return;
};
if version.is_some_and(|min_version| min_version > config.version()) {
// We don't have the version that the peer is asking for. Just ignore.
debug!(
"Peer requested nodes config version {} but we have {}, ignoring their request",
version.unwrap(),
config.version()
);
return;
}
info!(
"Sending nodes config {} to peer, requested version? {:?}",
config.version(),
version,
);
let _ = task_center().spawn_child(
crate::TaskKind::Disposable,
"send-metadata-to-peer",
None,
{
let networking = self.networking.clone();
async move {
networking
.send(
to.into(),
&NetworkMessage::MetadataUpdate(MetadataUpdate {
container: MetadataContainer::NodesConfiguration(
config.deref().clone(),
),
}),
)
.await?;
Ok(())
}
},
);
}

fn update_nodes_configuration(
&mut self,
config: NodesConfiguration,
Expand All @@ -124,6 +220,11 @@ impl MetadataManager {
}
Some(current) => {
/* Do nothing, current is already newer */
debug!(
"Ignoring nodes config update {} because we are at {}",
config.version(),
current.version(),
);
maybe_new_version = current.version();
}
}
Expand Down Expand Up @@ -153,18 +254,33 @@ mod tests {

use super::*;

use async_trait::async_trait;
use googletest::prelude::*;
use restate_test_util::assert_eq;
use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role};
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use crate::metadata::spawn_metadata_manager;
use crate::TaskCenterFactory;
use crate::{NetworkSendError, TaskCenterFactory};

struct MockNetworkSender;

#[async_trait]
impl NetworkSender for MockNetworkSender {
async fn send(
&self,
_to: NodeId,
_message: &NetworkMessage,
) -> std::result::Result<(), NetworkSendError> {
Ok(())
}
}

#[tokio::test]
async fn test_nodes_config_updates() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down Expand Up @@ -219,8 +335,9 @@ mod tests {

#[tokio::test]
async fn test_watchers() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down
28 changes: 23 additions & 5 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,25 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::FutureExt;
use restate_types::identifiers::PartitionId;
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};

use futures::Future;
use async_trait::async_trait;
use futures::{Future, FutureExt};
use restate_types::NodeId;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use tracing::{debug, error, info, instrument, trace, warn};

use restate_node_protocol::NetworkMessage;
use restate_types::identifiers::PartitionId;

use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager};
use crate::{TaskId, TaskKind};
use crate::{NetworkSendError, NetworkSender, TaskId, TaskKind};

static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0);
const EXIT_CODE_FAILURE: i32 = 1;
Expand All @@ -32,6 +35,20 @@ const EXIT_CODE_FAILURE: i32 = 1;
#[error("system is shutting down")]
pub struct ShutdownError;

// TEMPORARY. REMOVED IN NEXT PR(s)
struct MockNetworkSender;

#[async_trait]
impl NetworkSender for MockNetworkSender {
async fn send(
&self,
_to: NodeId,
_message: &NetworkMessage,
) -> std::result::Result<(), NetworkSendError> {
Ok(())
}
}

/// Used to create a new task center. In practice, there should be a single task center for the
/// entire process but we might need to create more than one in integration test scenarios.
pub struct TaskCenterFactory {}
Expand All @@ -56,7 +73,8 @@ pub fn create_test_task_center() -> TaskCenter {

let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());

let metadata_manager = MetadataManager::build();
let networking = Arc::new(MockNetworkSender);
let metadata_manager = MetadataManager::build(networking);
let metadata = metadata_manager.metadata();
metadata_manager
.writer()
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum TaskKind {
/// A background task that the system needs for its operation. The task requires a system
/// shutdown on errors and the system will wait for its graceful cancellation on shutdown.
BifrostBackgroundHighPriority,
#[strum(props(OnCancel = "abort", OnError = "log"))]
Disposable,
}

impl TaskKind {
Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ tokio = { workspace = true }
http = "1.0"
http-body = "1.0"
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["http1", "http2", "server"] }
hyper-util = { version = "0.1", features = ["http1", "http2", "server", "tokio"] }

# Tracing
opentelemetry = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ publish = false
[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-node-protocol = { workspace = true }
restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
pin-project = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use std::fmt::Debug;
use std::future::Future;
use tokio::sync::mpsc;

mod v2;
pub use v2::*;

mod routing;
mod unbounded_handle;
pub mod utils;
Expand Down
Loading

0 comments on commit c18075c

Please sign in to comment.