Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/n] Move my node ID to metadata and metadata-manager related fixes #1240

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod metadata;

mod metadata;
mod task_center;
mod task_center_types;

pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter};
pub use task_center::*;
pub use task_center_types::*;
9 changes: 5 additions & 4 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::info;

Expand All @@ -22,8 +23,8 @@ use super::MetadataInner;
use super::MetadataKind;
use super::MetadataWriter;

pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver<Command>;
pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;

pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
Expand Down Expand Up @@ -70,11 +71,11 @@ impl MetadataManager {
}

pub fn writer(&self) -> MetadataWriter {
MetadataWriter::new(self.self_sender.clone())
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
}

/// Start and wait for shutdown signal.
pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> {
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Metadata manager started");

loop {
Expand Down
27 changes: 21 additions & 6 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
mod manager;
pub use manager::MetadataManager;

use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arc_swap::ArcSwapOption;
use enum_map::{Enum, EnumMap};
Expand All @@ -23,7 +23,7 @@ use tokio::sync::{oneshot, watch};

use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::Version;
use restate_types::{GenerationalNodeId, Version};

/// The kind of versioned metadata that can be synchronized across nodes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)]
Expand Down Expand Up @@ -64,10 +64,16 @@ impl Metadata {
}

/// Panics if nodes configuration is not loaded yet.
#[track_caller]
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full().unwrap()
}

#[track_caller]
pub fn my_node_id(&self) -> GenerationalNodeId {
*self.inner.my_node_id.get().expect("my_node_id is set")
}

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
let c = self.inner.nodes_config.load();
Expand All @@ -77,7 +83,7 @@ impl Metadata {
}
}

// Returns when the metadata kind is at the provided version (or newer)
/// Returns when the metadata kind is at the provided version (or newer)
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
Expand All @@ -91,14 +97,15 @@ impl Metadata {
Ok(*v)
}

// Watch for version updates of this metadata kind.
/// Watch for version updates of this metadata kind.
pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver<Version> {
self.inner.write_watches[metadata_kind].receive.clone()
}
}

#[derive(Default)]
struct MetadataInner {
my_node_id: OnceLock<GenerationalNodeId>,
nodes_config: ArcSwapOption<NodesConfiguration>,
write_watches: EnumMap<MetadataKind, VersionWatch>,
}
Expand All @@ -110,11 +117,14 @@ struct MetadataInner {
#[derive(Clone)]
pub struct MetadataWriter {
sender: manager::CommandSender,
/// strictly used to set my node id. Do not use this to update metadata
/// directly to avoid race conditions.
inner: Arc<MetadataInner>,
}

impl MetadataWriter {
fn new(sender: manager::CommandSender) -> Self {
Self { sender }
fn new(sender: manager::CommandSender, inner: Arc<MetadataInner>) -> Self {
Self { sender, inner }
}

// Returns when the nodes configuration update is performed.
Expand All @@ -132,6 +142,11 @@ impl MetadataWriter {
}
}

/// Should be called once on node startup. Updates are ignored after the initial value is set.
pub fn set_my_node_id(&self, id: GenerationalNodeId) {
let _ = self.inner.my_node_id.set(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an expect to catch accidental misuses?

}

// Fire and forget update
pub fn submit(&self, value: impl Into<MetadataContainer>) {
// Ignore the error, task-center takes care of safely shutting down the
Expand Down
48 changes: 38 additions & 10 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::Future;
use tokio::task::JoinHandle;
use tokio::task_local;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, trace, warn};

use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager};
use crate::{TaskId, TaskKind};
Expand Down Expand Up @@ -52,10 +52,15 @@ impl TaskCenterFactory {

#[cfg(any(test, feature = "test-util"))]
pub fn create_test_task_center() -> TaskCenter {
use restate_types::GenerationalNodeId;

let tc = TaskCenterFactory::create(tokio::runtime::Handle::current());

let metadata_manager = MetadataManager::build();
let metadata = metadata_manager.metadata();
metadata_manager
.writer()
.set_my_node_id(GenerationalNodeId::new(1, 1));
tc.try_set_global_metadata(metadata);

spawn_metadata_manager(&tc, metadata_manager).expect("metadata manager should start");
Expand Down Expand Up @@ -140,7 +145,12 @@ impl TaskCenter {
});

inner.tasks.lock().unwrap().insert(id, Arc::clone(&task));
let metadata = inner.global_metadata.get().cloned();
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| inner.global_metadata.get().cloned());

let mut handle_mut = task.join_handle.lock().unwrap();

Expand Down Expand Up @@ -324,7 +334,7 @@ impl TaskCenter {
future: F,
) -> O
where
F: Future<Output = O> + Send + 'static,
F: Future<Output = O> + Send,
{
let cancel_token = CancellationToken::new();
let id = TaskId::from(NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst));
Expand All @@ -336,10 +346,19 @@ impl TaskCenter {
cancel: cancel_token.clone(),
join_handle: Mutex::new(None),
});
CURRENT_TASK_CENTER
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| self.inner.global_metadata.get().cloned());
METADATA
.scope(
self.clone(),
CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)),
metadata,
CURRENT_TASK_CENTER.scope(
self.clone(),
CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)),
),
)
.await
}
Expand All @@ -365,8 +384,16 @@ impl TaskCenter {
cancel: cancel_token.clone(),
join_handle: Mutex::new(None),
});
CURRENT_TASK_CENTER.sync_scope(self.clone(), || {
CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f))
// Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata.
let metadata = METADATA
.try_with(|m| m.clone())
.ok()
.flatten()
.or_else(|| self.inner.global_metadata.get().cloned());
METADATA.sync_scope(metadata, || {
CURRENT_TASK_CENTER.sync_scope(self.clone(), || {
CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f))
})
})
}

Expand Down Expand Up @@ -425,7 +452,7 @@ impl TaskCenter {
{
match result {
Ok(Ok(())) => {
debug!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id);
trace!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id);
}
Ok(Err(err)) => {
if err.root_cause().downcast_ref::<ShutdownError>().is_some() {
Expand Down Expand Up @@ -550,7 +577,8 @@ pub fn current_task_id() -> Option<TaskId> {
CURRENT_TASK.try_with(|ct| ct.id).ok()
}

/// Access to global metadata handle. This available in task-center tasks only!
/// Accepss to global metadata handle. This available in task-center tasks only!
#[track_caller]
pub fn metadata() -> Metadata {
METADATA
.try_with(|m| m.clone())
Expand Down
21 changes: 6 additions & 15 deletions crates/ingress-dispatcher/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_pb::restate::internal::{
};
use restate_types::identifiers::FullInvocationId;
use restate_types::invocation::{ServiceInvocationResponseSink, Source};
use restate_types::{GenerationalNodeId, NodeId};
use restate_types::GenerationalNodeId;
use std::collections::HashMap;
use std::future::poll_fn;
use tokio::select;
Expand Down Expand Up @@ -65,12 +65,9 @@ impl Service {
}
}

pub async fn run(
self,
my_node_id: NodeId,
output_tx: mpsc::Sender<Envelope>,
) -> anyhow::Result<()> {
pub async fn run(self, output_tx: mpsc::Sender<Envelope>) -> anyhow::Result<()> {
debug!("Running the ResponseDispatcher");
let my_node_id = metadata().my_node_id();

let Service {
server_rx,
Expand All @@ -91,11 +88,7 @@ impl Service {

tokio::pin!(pipe);

let mut handler = DispatcherLoopHandler::new(
my_node_id
.as_generational()
.expect("My node ID is generational"),
);
let mut handler = DispatcherLoopHandler::new(my_node_id);

loop {
select! {
Expand Down Expand Up @@ -359,7 +352,6 @@ mod tests {
let tc = create_test_task_center();
let (output_tx, _output_rx) = mpsc::channel(2);

let my_node_id = GenerationalNodeId::new(1, 1);
let ingress_dispatcher = Service::new(1);
let input_sender = ingress_dispatcher.create_ingress_dispatcher_input_sender();
let command_sender = ingress_dispatcher.create_ingress_request_sender();
Expand All @@ -370,7 +362,7 @@ mod tests {
TaskKind::SystemService,
"ingress-dispatcher",
None,
ingress_dispatcher.run(my_node_id.into(), output_tx),
ingress_dispatcher.run(output_tx),
)
.unwrap();

Expand Down Expand Up @@ -405,7 +397,6 @@ mod tests {
let tc = create_test_task_center();
let (output_tx, mut output_rx) = mpsc::channel(2);

let my_node_id = GenerationalNodeId::new(1, 1);
let ingress_dispatcher = Service::new(1);
let handler_tx = ingress_dispatcher.create_ingress_request_sender();
let network_tx = ingress_dispatcher.create_ingress_dispatcher_input_sender();
Expand All @@ -415,7 +406,7 @@ mod tests {
TaskKind::SystemService,
"ingress-dispatcher",
None,
ingress_dispatcher.run(my_node_id.into(), output_tx),
ingress_dispatcher.run(output_tx),
)
.unwrap();

Expand Down
9 changes: 9 additions & 0 deletions crates/node-services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,13 @@ pub mod common {
}
}
}

impl From<restate_types::GenerationalNodeId> for NodeId {
fn from(node_id: restate_types::GenerationalNodeId) -> Self {
NodeId {
id: node_id.raw_id(),
generation: Some(node_id.generation()),
}
}
}
}
13 changes: 5 additions & 8 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use anyhow::bail;
use codederror::CodedError;
use tracing::{error, info};

use restate_core::metadata::{spawn_metadata_manager, MetadataManager};
use restate_core::{spawn_metadata_manager, MetadataManager};
use restate_core::{task_center, TaskKind};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role};
use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, Version};
use restate_types::{GenerationalNodeId, Version};

use crate::network_server::{AdminDependencies, NetworkServer, WorkerDependencies};
use crate::roles::{AdminRole, WorkerRole};
Expand Down Expand Up @@ -100,8 +100,6 @@ impl Node {
let bifrost = options.bifrost.build(options.worker.partitions);

let server = options.server.build(
metadata_manager.metadata(),
metadata_manager.writer(),
worker_role.as_ref().map(|worker| {
WorkerDependencies::new(
worker.rocksdb_storage().clone(),
Expand Down Expand Up @@ -146,7 +144,7 @@ impl Node {
my_id.with_generation(1)
} else {
// default to node-id 1 generation 1
GenerationalNodeId::new(1, 1)
GenerationalNodeId::new(1, 0)
};
// Temporary: nodes configuration from current node.
let mut nodes_config =
Expand Down Expand Up @@ -211,15 +209,14 @@ impl Node {
let mut my_node_id = current_config.current_generation;
my_node_id.bump_generation();

let my_node_id: NodeId = my_node_id.into();
// TODO: replace this temporary code with proper CAS write to metadata store
// Simulate a node configuration update and commit
{
let address = self.options.server.advertise_address.clone();

let my_node = NodeConfig::new(
self.options.node_name.clone(),
my_node_id.as_generational().unwrap(),
my_node_id,
address,
self.options.roles,
);
Expand All @@ -230,7 +227,7 @@ impl Node {
metadata_writer.update(editable_nodes_config).await?;
}
// My Node ID is set
MyNodeIdWriter::set_as_my_node_id(my_node_id);
metadata_writer.set_my_node_id(my_node_id);
info!("My Node ID is {}", my_node_id);

// Ensures bifrost has initial metadata synced up before starting the worker.
Expand Down
Loading
Loading