Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [6/n] Ingress receives responses over network #1251

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 153 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ restate-meta = { path = "crates/meta" }
restate-meta-rest-model = { path = "crates/meta-rest-model" }
restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-node-protocol = { path = "crates/node-protocol" }
restate-node-services = { path = "crates/node-services" }
restate-pb = { path = "crates/pb" }
restate-queue = { path = "crates/queue" }
Expand Down Expand Up @@ -138,7 +139,7 @@ tokio-util = { version = "0.7.10" }
tokio-stream = "0.1.14"
tonic = { version = "0.10.2", default-features = false }
tonic-reflection = { version = "0.10.2" }
tonic-build = "0.10.2"
tonic-build = "0.11.0"
tower = "0.4"
tower-http = { version = "0.4", default-features = false }
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-meta = { workspace = true }
restate-meta-rest-model = { workspace = true, features = ["schema"] }
restate-node-services = { workspace = true }
restate-node-services = { workspace = true, features = ["servers"] }
restate-pb = { workspace = true }
restate-schema-api = { workspace = true, features = ["service", "deployment", "serde", "serde_schema"] }
restate-schema-impl = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::rest_api::error::MetaApiError;
use okapi_operation::axum_integration::{delete, get, patch, post};
use okapi_operation::*;
use restate_meta::{FileMetaReader, MetaReader};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node::UpdateSchemaRequest;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::UpdateSchemaRequest;
use tonic::transport::Channel;
use tracing::debug;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::info;

use restate_core::cancellation_watcher;
use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_schema_impl::Schemas;

use crate::{rest_api, state, storage_query};
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//

use restate_meta::{FileMetaReader, MetaHandle};
use restate_node_services::node::node_svc_client::NodeSvcClient;
use restate_node_services::node_svc::node_svc_client::NodeSvcClient;
use restate_schema_impl::Schemas;
use tonic::transport::Channel;

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use okapi_operation::*;
use restate_node_services::node::StorageQueryRequest;
use restate_node_services::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;
Expand Down
9 changes: 6 additions & 3 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,17 @@ mod tests {
use crate::loglets::memory_loglet::MemoryLogletProvider;
use googletest::prelude::*;

use restate_core::{create_test_task_center, task_center};
use restate_core::task_center;
use restate_core::TestCoreEnv;
use restate_types::logs::SequenceNumber;
use tracing::info;
use tracing_test::traced_test;

#[tokio::test]
#[traced_test]
async fn test_append_smoke() -> Result<()> {
let tc = create_test_task_center();
let node_env = TestCoreEnv::create_with_mock_nodes_config(1, 1).await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
// start a simple bifrost service with 5 logs.
let num_partitions = 5;
Expand Down Expand Up @@ -344,7 +346,8 @@ mod tests {

#[tokio::test(start_paused = true)]
async fn test_lazy_initialization() -> Result<()> {
let tc = create_test_task_center();
let node_env = TestCoreEnv::create_with_mock_nodes_config(1, 1).await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
let delay = Duration::from_secs(5);
let num_partitions = 5;
Expand Down
5 changes: 3 additions & 2 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mod tests {
use super::*;

use googletest::prelude::*;
use restate_core::create_test_task_center;
use restate_core::TestCoreEnv;
use tokio::task::JoinHandle;
use tracing::info;
use tracing_test::traced_test;
Expand All @@ -95,7 +95,8 @@ mod tests {
#[tokio::test]
#[traced_test]
async fn test_basic_readstream() -> Result<()> {
let tc = create_test_task_center();
let node_env = TestCoreEnv::create_with_mock_nodes_config(1, 1).await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
// start a simple bifrost service with 5 logs.
let num_partitions = 5;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ test-util = []

[dependencies]
restate-types = { workspace = true }
restate-node-protocol = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
async-trait = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
Expand Down
12 changes: 10 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod metadata;

mod metadata;
mod network_sender;
mod task_center;
mod task_center_types;

pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter};
pub use network_sender::{NetworkSendError, NetworkSender};
pub use task_center::*;
pub use task_center_types::*;

#[cfg(any(test, feature = "test-util"))]
mod test_env;

#[cfg(any(test, feature = "test-util"))]
pub use test_env::{create_mock_nodes_config, TestCoreEnv};
138 changes: 128 additions & 10 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Deref;
use std::sync::Arc;

use restate_node_protocol::MessageEnvelope;
use restate_node_protocol::MetadataUpdate;
use restate_node_protocol::NetworkMessage;
use restate_types::GenerationalNodeId;
use restate_types::Version;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::debug;
use tracing::info;

use crate::cancellation_watcher;
use crate::network_sender::NetworkSender;
use crate::task_center;
use restate_types::nodes_config::NodesConfiguration;

use super::Metadata;
Expand All @@ -22,11 +32,12 @@ use super::MetadataInner;
use super::MetadataKind;
use super::MetadataWriter;

pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver<Command>;
pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
SendMetadataToPeer(GenerationalNodeId, MetadataKind, Option<Version>),
}

/// Handle to access locally cached metadata, request metadata updates, and more.
Expand All @@ -52,16 +63,25 @@ pub struct MetadataManager {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
inbound: CommandReceiver,
networking: Arc<dyn NetworkSender>,
// Handle inbound network messages to update our metadata and to respond to
// external metadata fetch requests
network_inbound: mpsc::Receiver<MessageEnvelope>,
network_inbound_sender: mpsc::Sender<MessageEnvelope>,
}

impl MetadataManager {
pub fn build() -> Self {
let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel();
pub fn build(networking: Arc<dyn NetworkSender>) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
let (network_inbound_sender, network_inbound) = mpsc::channel(1);

Self {
inner: Arc::new(MetadataInner::default()),
inbound,
networking,
self_sender,
network_inbound,
network_inbound_sender,
}
}

Expand All @@ -70,11 +90,15 @@ impl MetadataManager {
}

pub fn writer(&self) -> MetadataWriter {
MetadataWriter::new(self.self_sender.clone())
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
}

pub fn network_inbound_sender(&self) -> mpsc::Sender<MessageEnvelope> {
self.network_inbound_sender.clone()
}

/// Start and wait for shutdown signal.
pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> {
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Metadata manager started");

loop {
Expand All @@ -87,6 +111,9 @@ impl MetadataManager {
Some(cmd) = self.inbound.recv() => {
self.handle_command(cmd)
}
Some(envelope) = self.network_inbound.recv() => {
self.handle_network_message(envelope).await
}
}
}
Ok(())
Expand All @@ -95,9 +122,23 @@ impl MetadataManager {
fn handle_command(&mut self, cmd: Command) {
match cmd {
Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback),
Command::SendMetadataToPeer(peer, kind, min_version) => {
self.send_metadata(peer, kind, min_version)
}
}
}

async fn handle_network_message(&mut self, envelope: MessageEnvelope) {
let (peer, msg) = envelope.split();
match msg {
NetworkMessage::MetadataUpdate(update) => self.update_metadata(update.container, None),
NetworkMessage::GetMetadataRequest(request) => {
debug!("Received GetMetadataRequest from peer {}", peer);
self.send_metadata(peer, request.metadata_kind, request.min_version);
}
};
}

fn update_metadata(&mut self, value: MetadataContainer, callback: Option<oneshot::Sender<()>>) {
match value {
MetadataContainer::NodesConfiguration(config) => {
Expand All @@ -106,6 +147,62 @@ impl MetadataManager {
}
}

fn send_metadata(
&mut self,
peer: GenerationalNodeId,
metadata_kind: MetadataKind,
min_version: Option<Version>,
) {
match metadata_kind {
MetadataKind::NodesConfiguration => self.send_nodes_config(peer, min_version),
_ => {
todo!("Can't send metadata '{}' to peer", metadata_kind)
}
};
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = self.inner.nodes_config.load_full();
let Some(config) = config else {
return;
};
if version.is_some_and(|min_version| min_version > config.version()) {
// We don't have the version that the peer is asking for. Just ignore.
debug!(
"Peer requested nodes config version {} but we have {}, ignoring their request",
version.unwrap(),
config.version()
);
return;
}
info!(
"Sending nodes config {} to peer, requested version? {:?}",
config.version(),
version,
);
let _ = task_center().spawn_child(
crate::TaskKind::Disposable,
"send-metadata-to-peer",
None,
{
let networking = self.networking.clone();
async move {
networking
.send(
to.into(),
&NetworkMessage::MetadataUpdate(MetadataUpdate {
container: MetadataContainer::NodesConfiguration(
config.deref().clone(),
),
}),
)
.await?;
Ok(())
}
},
);
}

fn update_nodes_configuration(
&mut self,
config: NodesConfiguration,
Expand All @@ -123,6 +220,11 @@ impl MetadataManager {
}
Some(current) => {
/* Do nothing, current is already newer */
debug!(
"Ignoring nodes config update {} because we are at {}",
config.version(),
current.version(),
);
maybe_new_version = current.version();
}
}
Expand Down Expand Up @@ -152,18 +254,33 @@ mod tests {

use super::*;

use async_trait::async_trait;
use googletest::prelude::*;
use restate_test_util::assert_eq;
use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role};
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use crate::metadata::spawn_metadata_manager;
use crate::TaskCenterFactory;
use crate::{NetworkSendError, TaskCenterFactory};

struct MockNetworkSender;

#[async_trait]
impl NetworkSender for MockNetworkSender {
async fn send(
&self,
_to: NodeId,
_message: &NetworkMessage,
) -> std::result::Result<(), NetworkSendError> {
Ok(())
}
}

#[tokio::test]
async fn test_nodes_config_updates() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down Expand Up @@ -218,8 +335,9 @@ mod tests {

#[tokio::test]
async fn test_watchers() -> Result<()> {
let network_sender = Arc::new(MockNetworkSender);
let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());
let metadata_manager = MetadataManager::build();
let metadata_manager = MetadataManager::build(network_sender);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

Expand Down
Loading
Loading