diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 372194f80..40d89af8b 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -8,10 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod metadata; - +mod metadata; mod task_center; mod task_center_types; +pub use metadata::{spawn_metadata_manager, Metadata, MetadataManager, MetadataWriter}; pub use task_center::*; pub use task_center_types::*; diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 38be4d308..b264d1fd3 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -10,6 +10,7 @@ use std::sync::Arc; +use tokio::sync::mpsc; use tokio::sync::oneshot; use tracing::info; @@ -22,8 +23,8 @@ use super::MetadataInner; use super::MetadataKind; use super::MetadataWriter; -pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender; -pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver; +pub(super) type CommandSender = mpsc::UnboundedSender; +pub(super) type CommandReceiver = mpsc::UnboundedReceiver; pub(super) enum Command { UpdateMetadata(MetadataContainer, Option>), @@ -70,11 +71,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.self_sender.clone()) + MetadataWriter::new(self.self_sender.clone(), self.inner.clone()) } /// Start and wait for shutdown signal. - pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> { + pub async fn run(mut self) -> anyhow::Result<()> { info!("Metadata manager started"); loop { diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index fe470e070..017e8496a 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -14,7 +14,7 @@ mod manager; pub use manager::MetadataManager; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use arc_swap::ArcSwapOption; use enum_map::{Enum, EnumMap}; @@ -23,7 +23,7 @@ use tokio::sync::{oneshot, watch}; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; use restate_types::nodes_config::NodesConfiguration; -use restate_types::Version; +use restate_types::{GenerationalNodeId, Version}; /// The kind of versioned metadata that can be synchronized across nodes. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)] @@ -64,10 +64,16 @@ impl Metadata { } /// Panics if nodes configuration is not loaded yet. + #[track_caller] pub fn nodes_config(&self) -> Arc { self.inner.nodes_config.load_full().unwrap() } + #[track_caller] + pub fn my_node_id(&self) -> GenerationalNodeId { + *self.inner.my_node_id.get().expect("my_node_id is set") + } + /// Returns Version::INVALID if nodes configuration has not been loaded yet. pub fn nodes_config_version(&self) -> Version { let c = self.inner.nodes_config.load(); @@ -77,7 +83,7 @@ impl Metadata { } } - // Returns when the metadata kind is at the provided version (or newer) + /// Returns when the metadata kind is at the provided version (or newer) pub async fn wait_for_version( &self, metadata_kind: MetadataKind, @@ -91,7 +97,7 @@ impl Metadata { Ok(*v) } - // Watch for version updates of this metadata kind. + /// Watch for version updates of this metadata kind. pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver { self.inner.write_watches[metadata_kind].receive.clone() } @@ -99,6 +105,7 @@ impl Metadata { #[derive(Default)] struct MetadataInner { + my_node_id: OnceLock, nodes_config: ArcSwapOption, write_watches: EnumMap, } @@ -110,11 +117,14 @@ struct MetadataInner { #[derive(Clone)] pub struct MetadataWriter { sender: manager::CommandSender, + /// strictly used to set my node id. Do not use this to update metadata + /// directly to avoid race conditions. + inner: Arc, } impl MetadataWriter { - fn new(sender: manager::CommandSender) -> Self { - Self { sender } + fn new(sender: manager::CommandSender, inner: Arc) -> Self { + Self { sender, inner } } // Returns when the nodes configuration update is performed. @@ -132,6 +142,11 @@ impl MetadataWriter { } } + /// Should be called once on node startup. Updates are ignored after the initial value is set. + pub fn set_my_node_id(&self, id: GenerationalNodeId) { + let _ = self.inner.my_node_id.set(id); + } + // Fire and forget update pub fn submit(&self, value: impl Into) { // Ignore the error, task-center takes care of safely shutting down the diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 0db41a819..4cd024fb5 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -20,7 +20,7 @@ use futures::Future; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use crate::metadata::{spawn_metadata_manager, Metadata, MetadataManager}; use crate::{TaskId, TaskKind}; @@ -52,10 +52,15 @@ impl TaskCenterFactory { #[cfg(any(test, feature = "test-util"))] pub fn create_test_task_center() -> TaskCenter { + use restate_types::GenerationalNodeId; + let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); let metadata_manager = MetadataManager::build(); let metadata = metadata_manager.metadata(); + metadata_manager + .writer() + .set_my_node_id(GenerationalNodeId::new(1, 1)); tc.try_set_global_metadata(metadata); spawn_metadata_manager(&tc, metadata_manager).expect("metadata manager should start"); @@ -140,7 +145,12 @@ impl TaskCenter { }); inner.tasks.lock().unwrap().insert(id, Arc::clone(&task)); - let metadata = inner.global_metadata.get().cloned(); + // Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata. + let metadata = METADATA + .try_with(|m| m.clone()) + .ok() + .flatten() + .or_else(|| inner.global_metadata.get().cloned()); let mut handle_mut = task.join_handle.lock().unwrap(); @@ -324,7 +334,7 @@ impl TaskCenter { future: F, ) -> O where - F: Future + Send + 'static, + F: Future + Send, { let cancel_token = CancellationToken::new(); let id = TaskId::from(NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst)); @@ -336,10 +346,19 @@ impl TaskCenter { cancel: cancel_token.clone(), join_handle: Mutex::new(None), }); - CURRENT_TASK_CENTER + // Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata. + let metadata = METADATA + .try_with(|m| m.clone()) + .ok() + .flatten() + .or_else(|| self.inner.global_metadata.get().cloned()); + METADATA .scope( - self.clone(), - CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)), + metadata, + CURRENT_TASK_CENTER.scope( + self.clone(), + CANCEL_TOKEN.scope(cancel_token, CURRENT_TASK.scope(task, future)), + ), ) .await } @@ -365,8 +384,16 @@ impl TaskCenter { cancel: cancel_token.clone(), join_handle: Mutex::new(None), }); - CURRENT_TASK_CENTER.sync_scope(self.clone(), || { - CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f)) + // Clone the currently set METADATA (and is Some()), otherwise fallback to global metadata. + let metadata = METADATA + .try_with(|m| m.clone()) + .ok() + .flatten() + .or_else(|| self.inner.global_metadata.get().cloned()); + METADATA.sync_scope(metadata, || { + CURRENT_TASK_CENTER.sync_scope(self.clone(), || { + CANCEL_TOKEN.sync_scope(cancel_token, || CURRENT_TASK.sync_scope(task, f)) + }) }) } @@ -425,7 +452,7 @@ impl TaskCenter { { match result { Ok(Ok(())) => { - debug!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id); + trace!(kind = ?kind, name = ?task.name, "Task {} exited normally", task_id); } Ok(Err(err)) => { if err.root_cause().downcast_ref::().is_some() { @@ -550,7 +577,8 @@ pub fn current_task_id() -> Option { CURRENT_TASK.try_with(|ct| ct.id).ok() } -/// Access to global metadata handle. This available in task-center tasks only! +/// Accepss to global metadata handle. This available in task-center tasks only! +#[track_caller] pub fn metadata() -> Metadata { METADATA .try_with(|m| m.clone()) diff --git a/crates/ingress-dispatcher/src/service.rs b/crates/ingress-dispatcher/src/service.rs index 54619a86e..cb461503d 100644 --- a/crates/ingress-dispatcher/src/service.rs +++ b/crates/ingress-dispatcher/src/service.rs @@ -22,7 +22,7 @@ use restate_pb::restate::internal::{ }; use restate_types::identifiers::FullInvocationId; use restate_types::invocation::{ServiceInvocationResponseSink, Source}; -use restate_types::{GenerationalNodeId, NodeId}; +use restate_types::GenerationalNodeId; use std::collections::HashMap; use std::future::poll_fn; use tokio::select; @@ -65,12 +65,9 @@ impl Service { } } - pub async fn run( - self, - my_node_id: NodeId, - output_tx: mpsc::Sender, - ) -> anyhow::Result<()> { + pub async fn run(self, output_tx: mpsc::Sender) -> anyhow::Result<()> { debug!("Running the ResponseDispatcher"); + let my_node_id = metadata().my_node_id(); let Service { server_rx, @@ -91,11 +88,7 @@ impl Service { tokio::pin!(pipe); - let mut handler = DispatcherLoopHandler::new( - my_node_id - .as_generational() - .expect("My node ID is generational"), - ); + let mut handler = DispatcherLoopHandler::new(my_node_id); loop { select! { @@ -359,7 +352,6 @@ mod tests { let tc = create_test_task_center(); let (output_tx, _output_rx) = mpsc::channel(2); - let my_node_id = GenerationalNodeId::new(1, 1); let ingress_dispatcher = Service::new(1); let input_sender = ingress_dispatcher.create_ingress_dispatcher_input_sender(); let command_sender = ingress_dispatcher.create_ingress_request_sender(); @@ -370,7 +362,7 @@ mod tests { TaskKind::SystemService, "ingress-dispatcher", None, - ingress_dispatcher.run(my_node_id.into(), output_tx), + ingress_dispatcher.run(output_tx), ) .unwrap(); @@ -405,7 +397,6 @@ mod tests { let tc = create_test_task_center(); let (output_tx, mut output_rx) = mpsc::channel(2); - let my_node_id = GenerationalNodeId::new(1, 1); let ingress_dispatcher = Service::new(1); let handler_tx = ingress_dispatcher.create_ingress_request_sender(); let network_tx = ingress_dispatcher.create_ingress_dispatcher_input_sender(); @@ -415,7 +406,7 @@ mod tests { TaskKind::SystemService, "ingress-dispatcher", None, - ingress_dispatcher.run(my_node_id.into(), output_tx), + ingress_dispatcher.run(output_tx), ) .unwrap(); diff --git a/crates/node-services/src/lib.rs b/crates/node-services/src/lib.rs index 0ea52b0ea..dd1484900 100644 --- a/crates/node-services/src/lib.rs +++ b/crates/node-services/src/lib.rs @@ -65,4 +65,13 @@ pub mod common { } } } + + impl From for NodeId { + fn from(node_id: restate_types::GenerationalNodeId) -> Self { + NodeId { + id: node_id.raw_id(), + generation: Some(node_id.generation()), + } + } + } } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 3b57aedf5..2a43989a5 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -24,10 +24,10 @@ use anyhow::bail; use codederror::CodedError; use tracing::{error, info}; -use restate_core::metadata::{spawn_metadata_manager, MetadataManager}; +use restate_core::{spawn_metadata_manager, MetadataManager}; use restate_core::{task_center, TaskKind}; use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role}; -use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, Version}; +use restate_types::{GenerationalNodeId, Version}; use crate::network_server::{AdminDependencies, NetworkServer, WorkerDependencies}; use crate::roles::{AdminRole, WorkerRole}; @@ -100,8 +100,6 @@ impl Node { let bifrost = options.bifrost.build(options.worker.partitions); let server = options.server.build( - metadata_manager.metadata(), - metadata_manager.writer(), worker_role.as_ref().map(|worker| { WorkerDependencies::new( worker.rocksdb_storage().clone(), @@ -146,7 +144,7 @@ impl Node { my_id.with_generation(1) } else { // default to node-id 1 generation 1 - GenerationalNodeId::new(1, 1) + GenerationalNodeId::new(1, 0) }; // Temporary: nodes configuration from current node. let mut nodes_config = @@ -211,7 +209,6 @@ impl Node { let mut my_node_id = current_config.current_generation; my_node_id.bump_generation(); - let my_node_id: NodeId = my_node_id.into(); // TODO: replace this temporary code with proper CAS write to metadata store // Simulate a node configuration update and commit { @@ -219,7 +216,7 @@ impl Node { let my_node = NodeConfig::new( self.options.node_name.clone(), - my_node_id.as_generational().unwrap(), + my_node_id, address, self.options.roles, ); @@ -230,7 +227,7 @@ impl Node { metadata_writer.update(editable_nodes_config).await?; } // My Node ID is set - MyNodeIdWriter::set_as_my_node_id(my_node_id); + metadata_writer.set_my_node_id(my_node_id); info!("My Node ID is {}", my_node_id); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 4b9b1cc99..993491c28 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -11,7 +11,6 @@ use tonic::{async_trait, Request, Response, Status}; use tracing::debug; -use restate_core::metadata::Metadata; use restate_meta::MetaReader; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc; use restate_node_services::cluster_ctrl::{AttachmentRequest, AttachmentResponse}; @@ -21,15 +20,11 @@ use crate::network_server::AdminDependencies; pub struct ClusterCtrlSvcHandler { admin_deps: AdminDependencies, - _metadata: Metadata, } impl ClusterCtrlSvcHandler { - pub fn new(admin_deps: AdminDependencies, metadata: Metadata) -> Self { - Self { - admin_deps, - _metadata: metadata, - } + pub fn new(admin_deps: AdminDependencies) -> Self { + Self { admin_deps } } } diff --git a/crates/node/src/network_server/handler/node.rs b/crates/node/src/network_server/handler/node.rs index 7ab180913..9bbba1148 100644 --- a/crates/node/src/network_server/handler/node.rs +++ b/crates/node/src/network_server/handler/node.rs @@ -12,36 +12,28 @@ use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::error::FlightError; use futures::stream::BoxStream; use futures::TryStreamExt; -use restate_core::metadata::{Metadata, MetadataWriter}; +use restate_core::{metadata, TaskCenter}; use restate_node_services::node::node_svc_server::NodeSvc; -use restate_node_services::node::{IdentResponse, NodeStatus}; use restate_node_services::node::{ - StateMutationRequest, StorageQueryRequest, StorageQueryResponse, TerminationRequest, - UpdateSchemaRequest, + IdentResponse, NodeStatus, StateMutationRequest, StorageQueryRequest, StorageQueryResponse, + TerminationRequest, UpdateSchemaRequest, }; use restate_schema_impl::SchemasUpdateCommand; -use restate_types::NodeId; use restate_worker_api::Handle; use tonic::{Request, Response, Status}; use crate::network_server::WorkerDependencies; pub struct NodeSvcHandler { + task_center: TaskCenter, worker: Option, - _metadata_writer: MetadataWriter, - _metadata: Metadata, } impl NodeSvcHandler { - pub fn new( - worker: Option, - metadata_writer: MetadataWriter, - metadata: Metadata, - ) -> Self { + pub fn new(task_center: TaskCenter, worker: Option) -> Self { Self { + task_center, worker, - _metadata: metadata, - _metadata_writer: metadata_writer, } } } @@ -50,10 +42,12 @@ impl NodeSvcHandler { impl NodeSvc for NodeSvcHandler { async fn get_ident(&self, _request: Request<()>) -> Result, Status> { // STUB IMPLEMENTATION - return Ok(Response::new(IdentResponse { - status: NodeStatus::Alive.into(), - node_id: NodeId::my_node_id().map(Into::into), - })); + self.task_center.run_in_scope_sync("get_ident", None, || { + Ok(Response::new(IdentResponse { + status: NodeStatus::Alive.into(), + node_id: Some(metadata().my_node_id().into()), + })) + }) } async fn terminate_invocation( @@ -111,15 +105,18 @@ impl NodeSvc for NodeSvcHandler { let Some(ref worker) = self.worker else { return Err(Status::failed_precondition("Not a worker node")); }; - let query = request.into_inner().query; - let record_stream = worker.query_context.execute(&query).await.map_err(|err| { - Status::internal(format!("failed executing the query '{}': {}", query, err)) - })?; + let record_stream = self + .task_center + .run_in_scope("query-storage", None, async move { + worker.query_context.execute(&query).await.map_err(|err| { + Status::internal(format!("failed executing the query '{}': {}", query, err)) + }) + }) + .await?; let schema = record_stream.schema(); - let response_stream = FlightDataEncoderBuilder::new() // CLI is expecting schema information @@ -132,7 +129,6 @@ impl NodeSvc for NodeSvcHandler { data: flight_data.data_body, }) .map_err(Status::from); - Ok(Response::new(Box::pin(response_stream))) } @@ -151,15 +147,19 @@ impl NodeSvc for NodeSvcHandler { ) .map_err(|err| Status::invalid_argument(err.to_string()))?; - crate::roles::update_schemas( - &worker.schemas, - worker.subscription_controller.as_ref(), - schema_updates, - ) - .await - .map_err(|err| { - Status::internal(format!("failed updating the schema information: {err}")) - })?; + self.task_center + .run_in_scope("update-schema", None, async move { + crate::roles::update_schemas( + &worker.schemas, + worker.subscription_controller.as_ref(), + schema_updates, + ) + .await + .map_err(|err| { + Status::internal(format!("failed updating the schema information: {err}")) + }) + }) + .await?; Ok(Response::new(())) } diff --git a/crates/node/src/network_server/options.rs b/crates/node/src/network_server/options.rs index dfafa6a30..88653a78b 100644 --- a/crates/node/src/network_server/options.rs +++ b/crates/node/src/network_server/options.rs @@ -13,7 +13,6 @@ use std::str::FromStr; use serde_with::serde_as; -use restate_core::metadata::{Metadata, MetadataWriter}; use restate_types::nodes_config::AdvertisedAddress; use crate::network_server::service::{AdminDependencies, NetworkServer, WorkerDependencies}; @@ -59,11 +58,9 @@ impl Default for Options { impl Options { pub fn build( self, - metadata: Metadata, - metadata_writer: MetadataWriter, node_deps: Option, admin_deps: Option, ) -> NetworkServer { - NetworkServer::new(self, metadata, metadata_writer, node_deps, admin_deps) + NetworkServer::new(self, node_deps, admin_deps) } } diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 42fec89bd..e34d2135a 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -16,8 +16,7 @@ use tower_http::trace::TraceLayer; use tracing::info; use restate_cluster_controller::ClusterControllerHandle; -use restate_core::cancellation_watcher; -use restate_core::metadata::{Metadata, MetadataWriter}; +use restate_core::{cancellation_watcher, task_center}; use restate_meta::FileMetaReader; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use restate_node_services::node::node_svc_server::NodeSvcServer; @@ -56,15 +55,11 @@ pub struct NetworkServer { opts: Options, worker_deps: Option, admin_deps: Option, - metadata: Metadata, - metadata_writer: MetadataWriter, } impl NetworkServer { pub fn new( opts: Options, - metadata: Metadata, - metadata_writer: MetadataWriter, worker_deps: Option, admin_deps: Option, ) -> Self { @@ -72,8 +67,6 @@ impl NetworkServer { opts, worker_deps, admin_deps, - metadata, - metadata_writer, } } @@ -114,19 +107,15 @@ impl NetworkServer { .register_encoded_file_descriptor_set(cluster_ctrl::FILE_DESCRIPTOR_SET); } - let cluster_controller_service = self.admin_deps.map(|admin_deps| { - ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new( - admin_deps, - self.metadata.clone(), - )) - }); + let cluster_controller_service = self + .admin_deps + .map(|admin_deps| ClusterCtrlSvcServer::new(ClusterCtrlSvcHandler::new(admin_deps))); let server_builder = tonic::transport::Server::builder() .layer(TraceLayer::new_for_grpc().make_span_with(span_factory)) .add_service(NodeSvcServer::new(NodeSvcHandler::new( + task_center(), self.worker_deps, - self.metadata_writer, - self.metadata, ))) .add_optional_service(cluster_controller_service) .add_service(reflection_service_builder.build()?); diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index d8f1e130f..7ab9d1f2e 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -27,7 +27,6 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_types::nodes_config::AdvertisedAddress; use restate_types::retries::RetryPolicy; -use restate_types::NodeId; use restate_worker::{SubscriptionControllerHandle, Worker, WorkerCommandSender}; use restate_worker_api::SubscriptionController; use tracing::info; @@ -169,7 +168,7 @@ impl WorkerRole { cc_client .clone() .attach_node(AttachmentRequest { - node_id: NodeId::my_node_id().map(Into::into), + node_id: Some(metadata().my_node_id().into()), }) .await }) diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 081109a36..16b526411 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -8,15 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -#[cfg(test)] -use arc_swap::ArcSwap; -use std::sync::OnceLock; - -#[cfg(not(test))] -static MY_NODE_ID: OnceLock = OnceLock::new(); -#[cfg(test)] -static MY_NODE_ID: OnceLock> = OnceLock::new(); - /// A generational node identifier. Nodes with the same ID but different generations /// represent the same node across different instances (restarts) of its lifetime. /// @@ -54,25 +45,6 @@ pub struct GenerationalNodeId(PlainNodeId, u32); #[display(fmt = "N{}", _0)] pub struct PlainNodeId(#[cfg_attr(feature = "serde_schema", schemars(default))] u32); -pub struct MyNodeIdWriter {} -impl MyNodeIdWriter { - #[cfg(not(test))] - pub fn set_as_my_node_id(node_id: NodeId) { - debug_assert!(node_id.as_generational().is_some()); - if let Err(e) = MY_NODE_ID.set(node_id) { - panic!("My NodeId can only be set once, it's already set to {}", e); - } - } - - #[cfg(test)] - pub fn set_as_my_node_id(node_id: NodeId) { - use std::sync::Arc; - - let n = MY_NODE_ID.get_or_init(|| ArcSwap::from_pointee(node_id)); - n.store(Arc::new(node_id)) - } -} - impl NodeId { pub fn new(id: u32, generation: Option) -> NodeId { match generation { @@ -81,16 +53,6 @@ impl NodeId { } } - #[cfg(not(test))] - pub fn my_node_id() -> Option { - MY_NODE_ID.get().copied() - } - - #[cfg(test)] - pub fn my_node_id() -> Option { - MY_NODE_ID.get().map(|n| *n.load().as_ref()) - } - pub fn new_plain(id: u32) -> NodeId { NodeId::Plain(PlainNodeId(id)) } diff --git a/crates/wal-protocol/Cargo.toml b/crates/wal-protocol/Cargo.toml index a942ab458..70013ed4f 100644 --- a/crates/wal-protocol/Cargo.toml +++ b/crates/wal-protocol/Cargo.toml @@ -13,9 +13,9 @@ serde = ["dep:serde", "dep:bincode", "enum-map/serde", "bytestring/serde", "rest options_schema = ["dep:schemars"] [dependencies] -restate-types = { workspace = true } -restate-storage-api = { workspace = true } restate-invoker-api = { workspace = true } +restate-storage-api = { workspace = true } +restate-types = { workspace = true } anyhow = { workspace = true } assert2 = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 5ab4ff3fb..823875460 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,7 +34,6 @@ use restate_storage_query_postgres::service::PostgresQueryService; use restate_storage_rocksdb::{RocksDBStorage, RocksDBWriter}; use restate_types::identifiers::{PartitionKey, PeerId}; use restate_types::message::PartitionTarget; -use restate_types::NodeId; use std::ops::RangeInclusive; use tokio::sync::mpsc; use tracing::debug; @@ -384,7 +383,6 @@ impl Worker { pub async fn run(self) -> anyhow::Result<()> { let tc = task_center(); - let my_node_id = NodeId::my_node_id().expect("my node ID is set"); let shutdown = cancellation_watcher(); let (shutdown_signal, shutdown_watch) = drain::channel(); @@ -403,7 +401,7 @@ impl Worker { "ingress-dispatcher", None, self.ingress_dispatcher_service - .run(my_node_id, self.network_ingress_sender), + .run(self.network_ingress_sender), )?; // Ingress RPC server diff --git a/crates/worker/src/partition/action_effect_handler.rs b/crates/worker/src/partition/action_effect_handler.rs index 6b75fa89f..d167d02d5 100644 --- a/crates/worker/src/partition/action_effect_handler.rs +++ b/crates/worker/src/partition/action_effect_handler.rs @@ -10,8 +10,8 @@ use super::leadership::ActionEffect; use crate::partition::ConsensusWriter; +use restate_core::metadata; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; -use restate_types::NodeId; use restate_wal_protocol::effects::BuiltinServiceEffects; use restate_wal_protocol::{AckMode, Command, Destination, Envelope, Header, Source}; use std::ops::RangeInclusive; @@ -114,7 +114,7 @@ impl ActionEffectHandler { leader_epoch: self.leader_epoch, // todo: Add support for deduplicating self proposals sequence_number: None, - node_id: NodeId::my_node_id().expect("NodeId should be set").id(), + node_id: metadata().my_node_id().as_plain(), }, } } diff --git a/crates/worker/src/partition/leadership/action_collector.rs b/crates/worker/src/partition/leadership/action_collector.rs index c973f770c..0ced0ce4d 100644 --- a/crates/worker/src/partition/leadership/action_collector.rs +++ b/crates/worker/src/partition/leadership/action_collector.rs @@ -19,13 +19,13 @@ use crate::partition::{shuffle, ConsensusWriter}; use bytes::Bytes; use futures::{Stream, StreamExt}; use prost::Message; +use restate_core::metadata; use restate_errors::NotRunningError; use restate_ingress_dispatcher::{IngressDispatcherInput, IngressDispatcherInputSender}; use restate_invoker_api::ServiceHandle; use restate_types::identifiers::{FullInvocationId, PartitionLeaderEpoch, WithPartitionKey}; use restate_types::invocation::{ServiceInvocation, Source, SpanRelation}; use restate_types::journal::CompletionResult; -use restate_types::NodeId; use restate_wal_protocol::effects::BuiltinServiceEffects; use restate_wal_protocol::timer::TimerValue; use restate_wal_protocol::{AckMode, Command, Destination, Envelope, Header}; @@ -261,7 +261,7 @@ where leader_epoch: partition_leader_epoch.1, // todo: Add support for deduplicating self proposals sequence_number: None, - node_id: NodeId::my_node_id().expect("NodeId should be set").id(), + node_id: metadata().my_node_id().as_plain(), }, }; diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 2b05f96de..b37cfb457 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -12,6 +12,7 @@ use crate::partition::shuffle::{HintSender, Shuffle, ShuffleMetadata}; use crate::partition::{shuffle, storage, ConsensusWriter}; use assert2::let_assert; use futures::{future, StreamExt}; +use restate_core::metadata; use restate_invoker_api::InvokeInputJournal; use restate_timer::TokioClock; use std::fmt::Debug; @@ -37,7 +38,6 @@ use restate_storage_rocksdb::RocksDBStorage; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch, PeerId}; use restate_types::journal::EntryType; -use restate_types::NodeId; use restate_wal_protocol::timer::TimerValue; use restate_wal_protocol::Envelope; @@ -199,7 +199,7 @@ where follower_state.peer_id, follower_state.partition_id, leader_epoch, - NodeId::my_node_id().expect("NodeId should be set"), + metadata().my_node_id().into(), ), partition_storage.clone(), follower_state.network_handle.create_shuffle_sender(),