Skip to content

Commit

Permalink
[1/n] Move my node ID to metadata and metadata-manager related fixes
Browse files Browse the repository at this point in the history
Removes `NodeId::my_node_id()` and introduces `metadata().my_node_id()`. Along with various fixes to allow automatic propagation of metadata between tasks
  • Loading branch information
AhmedSoliman committed Feb 28, 2024
1 parent 8ccb834 commit 731aed2
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 154 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod metadata;

mod metadata;
mod task_center;
mod task_center_types;

pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter};
pub use task_center::*;
pub use task_center_types::*;
9 changes: 5 additions & 4 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::info;

Expand All @@ -22,8 +23,8 @@ use super::MetadataInner;
use super::MetadataKind;
use super::MetadataWriter;

pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver<Command>;
pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
Expand Down Expand Up @@ -70,11 +71,11 @@ impl MetadataManager {
}

pub fn writer(&self) -> MetadataWriter {
MetadataWriter::new(self.self_sender.clone())
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
}

/// Start and wait for shutdown signal.
pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> {
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Metadata manager started");

loop {
Expand Down
23 changes: 19 additions & 4 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
mod manager;
pub use manager::MetadataManager;

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arc_swap::ArcSwapOption;
use enum_map::{Enum, EnumMap};
Expand All @@ -23,7 +23,7 @@ use tokio::sync::{oneshot, watch};

use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::Version;
use restate_types::{GenerationalNodeId, Version};

/// The kind of versioned metadata that can be synchronized across nodes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)]
Expand Down Expand Up @@ -64,10 +64,16 @@ impl Metadata {
}

/// Panics if nodes configuration is not loaded yet.
#[track_caller]
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full().unwrap()
}

#[track_caller]
pub fn my_node_id(&self) -> GenerationalNodeId {
*self.inner.my_node_id.get().expect("my_node_id is set")
}

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
let c = self.inner.nodes_config.load();
Expand Down Expand Up @@ -99,6 +105,7 @@ impl Metadata {

#[derive(Default)]
struct MetadataInner {
my_node_id: OnceLock<GenerationalNodeId>,
nodes_config: ArcSwapOption<NodesConfiguration>,
write_watches: EnumMap<MetadataKind, VersionWatch>,
}
Expand All @@ -110,11 +117,14 @@ struct MetadataInner {
#[derive(Clone)]
pub struct MetadataWriter {
sender: manager::CommandSender,
/// strictly used to set my node id. Do not use this to update metadata
/// directly to avoid race conditions.
inner: Arc<MetadataInner>,
}

impl MetadataWriter {
fn new(sender: manager::CommandSender) -> Self {
Self { sender }
fn new(sender: manager::CommandSender, inner: Arc<MetadataInner>) -> Self {
Self { sender, inner }
}

// Returns when the nodes configuration update is performed.
Expand All @@ -132,6 +142,11 @@ impl MetadataWriter {
}
}

// Should be called once on node startup. Updates are ignored after the initial value is set.
pub fn set_my_node_id(&self, id: GenerationalNodeId) {
let _ = self.inner.my_node_id.set(id);
}

// Fire and forget update
pub fn submit(&self, value: impl Into<MetadataContainer>) {
// Ignore the error, task-center takes care of safely shutting down the
Expand Down
48 changes: 38 additions & 10 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::Future;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, trace, warn};

use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager};
use crate::{TaskId, TaskKind};
Expand Down Expand Up @@ -52,10 +52,15 @@ impl TaskCenterFactory {

#[cfg(any(test, feature = "test-util"))]
pub fn create_test_task_center() -> TaskCenter {
use restate_types::GenerationalNodeId;

let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());

let metadata_manager = MetadataManager::build();
let metadata = metadata_manager.metadata();
metadata_manager
.writer()
.set_my_node_id(GenerationalNodeId::new(1, 1));
tc.try_set_global_metadata(metadata);

spawn_metadata_manager(&tc, metadata_manager).expect("metadata manager should start");
Expand Down Expand Up @@ -140,7 +145,12 @@ impl TaskCenter {
});

inner.tasks.lock().unwrap().insert(id, Arc::clone(&task));
let metadata = inner.global_metadata.get().cloned();
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| inner.global_metadata.get().cloned());

let mut handle_mut = task.join_handle.lock().unwrap();

Expand Down Expand Up @@ -324,7 +334,7 @@ impl TaskCenter {
future: F,
) -> O
where
F: Future<Output = O> + Send + 'static,
F: Future<Output = O> + Send,
{
let cancel_token = CancellationToken::new();
let id = TaskId::from(NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst));
Expand All @@ -336,10 +346,19 @@ impl TaskCenter {
cancel: cancel_token.clone(),
join_handle: Mutex::new(None),
});
CURRENT_TASK_CENTER
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| self.inner.global_metadata.get().cloned());
METADATA
.scope(
self.clone(),
CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)),
metadata,
CURRENT_TASK_CENTER.scope(
self.clone(),
CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)),
),
)
.await
}
Expand All @@ -365,8 +384,16 @@ impl TaskCenter {
cancel: cancel_token.clone(),
join_handle: Mutex::new(None),
});
CURRENT_TASK_CENTER.sync_scope(self.clone(), || {
CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f))
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| self.inner.global_metadata.get().cloned());
METADATA.sync_scope(metadata, || {
CURRENT_TASK_CENTER.sync_scope(self.clone(), || {
CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f))
})
})
}

Expand Down Expand Up @@ -425,7 +452,7 @@ impl TaskCenter {
{
match result {
Ok(Ok(())) => {
debug!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id);
trace!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id);
}
Ok(Err(err)) => {
if err.root_cause().downcast_ref::<ShutdownError>().is_some() {
Expand Down Expand Up @@ -550,7 +577,8 @@ pub fn current_task_id() -> Option<TaskId> {
CURRENT_TASK.try_with(|ct| ct.id).ok()
}

/// Access to global metadata handle. This available in task-center tasks only!
/// Accepss to global metadata handle. This available in task-center tasks only!
#[track_caller]
pub fn metadata() -> Metadata {
METADATA
.try_with(|m| m.clone())
Expand Down
21 changes: 6 additions & 15 deletions crates/ingress-dispatcher/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_pb::restate::internal::{
};
use restate_types::identifiers::FullInvocationId;
use restate_types::invocation::{ServiceInvocationResponseSink, Source};
use restate_types::{GenerationalNodeId, NodeId};
use restate_types::GenerationalNodeId;
use std::collections::HashMap;
use std::future::poll_fn;
use tokio::select;
Expand Down Expand Up @@ -65,12 +65,9 @@ impl Service {
}
}

pub async fn run(
self,
my_node_id: NodeId,
output_tx: mpsc::Sender<Envelope>,
) -> anyhow::Result<()> {
pub async fn run(self, output_tx: mpsc::Sender<Envelope>) -> anyhow::Result<()> {
debug!("Running the ResponseDispatcher");
let my_node_id = metadata().my_node_id();

let Service {
server_rx,
Expand All @@ -91,11 +88,7 @@ impl Service {

tokio::pin!(pipe);

let mut handler = DispatcherLoopHandler::new(
my_node_id
.as_generational()
.expect("My node ID is generational"),
);
let mut handler = DispatcherLoopHandler::new(my_node_id);

loop {
select! {
Expand Down Expand Up @@ -359,7 +352,6 @@ mod tests {
let tc = create_test_task_center();
let (output_tx, _output_rx) = mpsc::channel(2);

let my_node_id = GenerationalNodeId::new(1, 1);
let ingress_dispatcher = Service::new(1);
let input_sender = ingress_dispatcher.create_ingress_dispatcher_input_sender();
let command_sender = ingress_dispatcher.create_ingress_request_sender();
Expand All @@ -370,7 +362,7 @@ mod tests {
TaskKind::SystemService,
"ingress-dispatcher",
None,
ingress_dispatcher.run(my_node_id.into(), output_tx),
ingress_dispatcher.run(output_tx),
)
.unwrap();

Expand Down Expand Up @@ -405,7 +397,6 @@ mod tests {
let tc = create_test_task_center();
let (output_tx, mut output_rx) = mpsc::channel(2);

let my_node_id = GenerationalNodeId::new(1, 1);
let ingress_dispatcher = Service::new(1);
let handler_tx = ingress_dispatcher.create_ingress_request_sender();
let network_tx = ingress_dispatcher.create_ingress_dispatcher_input_sender();
Expand All @@ -415,7 +406,7 @@ mod tests {
TaskKind::SystemService,
"ingress-dispatcher",
None,
ingress_dispatcher.run(my_node_id.into(), output_tx),
ingress_dispatcher.run(output_tx),
)
.unwrap();

Expand Down
9 changes: 9 additions & 0 deletions crates/node-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,13 @@ pub mod common {
}
}
}

impl From<restate_types::GenerationalNodeId> for NodeId {
fn from(node_id: restate_types::GenerationalNodeId) -> Self {
NodeId {
id: node_id.raw_id(),
generation: Some(node_id.generation()),
}
}
}
}
13 changes: 5 additions & 8 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use anyhow::bail;
use codederror::CodedError;
use tracing::{error, info};

use restate_core::metadata::{spawn_metadata_manager, MetadataManager};
use restate_core::{spawn_metadata_manager, MetadataManager};
use restate_core::{task_center, TaskKind};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role};
use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, Version};
use restate_types::{GenerationalNodeId, Version};

use crate::network_server::{AdminDependencies, NetworkServer, WorkerDependencies};
use crate::roles::{AdminRole, WorkerRole};
Expand Down Expand Up @@ -100,8 +100,6 @@ impl Node {
let bifrost = options.bifrost.build(options.worker.partitions);

let server = options.server.build(
metadata_manager.metadata(),
metadata_manager.writer(),
worker_role.as_ref().map(|worker| {
WorkerDependencies::new(
worker.rocksdb_storage().clone(),
Expand Down Expand Up @@ -146,7 +144,7 @@ impl Node {
my_id.with_generation(1)
} else {
// default to node-id 1 generation 1
GenerationalNodeId::new(1, 1)
GenerationalNodeId::new(1, 0)
};
// Temporary: nodes configuration from current node.
let mut nodes_config =
Expand Down Expand Up @@ -211,15 +209,14 @@ impl Node {
let mut my_node_id = current_config.current_generation;
my_node_id.bump_generation();

let my_node_id: NodeId = my_node_id.into();
// TODO: replace this temporary code with proper CAS write to metadata store
// Simulate a node configuration update and commit
{
let address = self.options.server.advertise_address.clone();

let my_node = NodeConfig::new(
self.options.node_name.clone(),
my_node_id.as_generational().unwrap(),
my_node_id,
address,
self.options.roles,
);
Expand All @@ -230,7 +227,7 @@ impl Node {
metadata_writer.update(editable_nodes_config).await?;
}
// My Node ID is set
MyNodeIdWriter::set_as_my_node_id(my_node_id);
metadata_writer.set_my_node_id(my_node_id);
info!("My Node ID is {}", my_node_id);

// Ensures bifrost has initial metadata synced up before starting the worker.
Expand Down
9 changes: 2 additions & 7 deletions crates/node/src/network_server/handler/cluster_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use tonic::{async_trait, Request, Response, Status};
use tracing::debug;

use restate_core::metadata::Metadata;
use restate_meta::MetaReader;
use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc;
use restate_node_services::cluster_ctrl::{AttachmentRequest, AttachmentResponse};
Expand All @@ -21,15 +20,11 @@ use crate::network_server::AdminDependencies;

pub struct ClusterCtrlSvcHandler {
admin_deps: AdminDependencies,
_metadata: Metadata,
}

impl ClusterCtrlSvcHandler {
pub fn new(admin_deps: AdminDependencies, metadata: Metadata) -> Self {
Self {
admin_deps,
_metadata: metadata,
}
pub fn new(admin_deps: AdminDependencies) -> Self {
Self { admin_deps }
}
}

Expand Down
Loading

0 comments on commit 731aed2

Please sign in to comment.