diff --git a/Cargo.lock b/Cargo.lock index 34c74a852..e7fd7ee0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5101,6 +5101,7 @@ dependencies = [ "bytes", "drain", "futures", + "http 0.2.11", "pin-project", "restate-errors", "restate-task-center", @@ -5109,6 +5110,8 @@ dependencies = [ "test-log", "thiserror", "tokio", + "tonic 0.10.2", + "tower", "tracing", "tracing-subscriber", ] @@ -5118,6 +5121,7 @@ name = "restate-node" version = "0.8.0" dependencies = [ "anyhow", + "arc-swap", "arrow-flight", "async-trait", "axum", @@ -5126,6 +5130,7 @@ dependencies = [ "datafusion", "derive_builder", "drain", + "enum-map", "enumset", "futures", "googletest", @@ -5142,6 +5147,7 @@ dependencies = [ "restate-cluster-controller", "restate-errors", "restate-meta", + "restate-network", "restate-node-services", "restate-schema-api", "restate-schema-impl", @@ -5157,6 +5163,8 @@ dependencies = [ "serde", "serde_json", "serde_with", + "strum 0.26.1", + "strum_macros 0.26.1", "test-log", "thiserror", "tokio", diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index 8e1a0238c..6688c11f3 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -16,9 +16,12 @@ anyhow = { workspace = true } bytes = { workspace = true } drain = { workspace = true } futures = { workspace = true } +http = { workspace = true } pin-project = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tonic = { workspace = true } +tower = { workspace = true } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index cd0d8e474..7aea11df6 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -16,6 +16,7 @@ use tokio::sync::mpsc; mod routing; mod unbounded_handle; +pub mod utils; pub use routing::{Network, PartitionProcessorSender, RoutingError}; pub use unbounded_handle::UnboundedNetworkHandle; diff --git a/crates/network/src/utils.rs b/crates/network/src/utils.rs new file mode 100644 index 000000000..ea8ab256c --- /dev/null +++ b/crates/network/src/utils.rs @@ -0,0 +1,39 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::Duration; + +use http::Uri; +use restate_types::nodes_config::AdvertisedAddress; +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint}; +use tower::service_fn; + +pub fn create_grpc_channel_from_network_address( + address: AdvertisedAddress, +) -> Result { + let channel = match address { + AdvertisedAddress::Uds(uds_path) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("/") + .expect("/ should be a valid Uri") + .connect_with_connector_lazy(service_fn(move |_: Uri| { + UnixStream::connect(uds_path.clone()) + })) + } + AdvertisedAddress::Http(uri) => { + // todo: Make the channel settings configurable + Channel::builder(uri) + .connect_timeout(Duration::from_secs(5)) + .connect_lazy() + } + }; + Ok(channel) +} diff --git a/crates/node-services/proto/cluster_controller.proto b/crates/node-services/proto/cluster_controller.proto index a58030dd7..615b20ffd 100644 --- a/crates/node-services/proto/cluster_controller.proto +++ b/crates/node-services/proto/cluster_controller.proto @@ -14,13 +14,12 @@ import "dev/restate/common/common.proto"; package dev.restate.cluster_controller; service ClusterControllerSvc { - // Attach node at cluster controller + // Attach worker at cluster controller rpc AttachNode(AttachmentRequest) returns (AttachmentResponse); } message AttachmentRequest { - string node_name = 1; - optional dev.restate.common.NodeId node_id = 2; + optional dev.restate.common.NodeId node_id = 1; } message AttachmentResponse {} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 62b217e8a..022a89587 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -17,12 +17,12 @@ options_schema = [ "restate-cluster-controller/options_schema"] [dependencies] - restate-admin = { workspace = true } restate-bifrost = { workspace = true } restate-cluster-controller = { workspace = true } restate-errors = { workspace = true } restate-meta = { workspace = true } +restate-network = { workspace = true } restate-node-services = { workspace = true } restate-schema-api = { workspace = true } restate-schema-impl = { workspace = true } @@ -34,13 +34,16 @@ restate-worker = { workspace = true } restate-worker-api = { workspace = true } anyhow = { workspace = true } +arc-swap = { workspace = true } arrow-flight = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } +bincode = { workspace = true } codederror = { workspace = true } datafusion = { workspace = true } derive_builder = { workspace = true } drain = { workspace = true } +enum-map = { workspace = true } enumset = { workspace = true } futures = { workspace = true } hostname = { version = "0.3.1" } @@ -54,9 +57,10 @@ metrics-util = { version = "0.16.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } -bincode = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } +strum = { workspace = true } +strum_macros = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index ed19b46ae..0e9ccb25c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -8,40 +8,35 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod metadata; mod options; mod roles; mod server; -use codederror::CodedError; -use restate_types::time::MillisSinceEpoch; -use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, PlainNodeId, Version}; -use std::str::FromStr; -use std::time::Duration; -use tokio::net::UnixStream; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -use tracing::{info, warn}; - -use crate::roles::{AdminRole, WorkerRole}; -use crate::server::{ClusterControllerDependencies, NodeServer, WorkerDependencies}; pub use options::{Options, OptionsBuilder as NodeOptionsBuilder}; pub use restate_admin::OptionsBuilder as AdminOptionsBuilder; pub use restate_meta::OptionsBuilder as MetaOptionsBuilder; -use restate_node_services::cluster_controller::cluster_controller_svc_client::ClusterControllerSvcClient; -use restate_node_services::cluster_controller::AttachmentRequest; -use restate_task_center::{task_center, TaskKind}; -use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, NodesConfiguration, Role}; -use restate_types::retries::RetryPolicy; pub use restate_worker::{OptionsBuilder as WorkerOptionsBuilder, RocksdbOptionsBuilder}; +use std::ops::Deref; + +use anyhow::bail; +use codederror::CodedError; +use tracing::{error, info}; + +use restate_task_center::{task_center, TaskKind}; +use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role}; +use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, Version}; + +use self::metadata::MetadataManager; +use crate::roles::{AdminRole, WorkerRole}; +use crate::server::{ClusterControllerDependencies, NodeServer, WorkerDependencies}; + #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { - #[error("invalid cluster controller address: {0}")] + #[error("node failed to start due to failed safety check: {0}")] #[code(unknown)] - InvalidClusterControllerAddress(http::Error), - #[error("failed to attach to cluster at '{0}': {1}")] - #[code(unknown)] - Attachment(AdvertisedAddress, tonic::Status), + SafetyCheck(String), } #[derive(Debug, thiserror::Error, CodedError)] @@ -69,9 +64,8 @@ pub enum BuildError { pub struct Node { options: Options, - admin_address: AdvertisedAddress, - admin_role: Option, + metadata_manager: MetadataManager, worker_role: Option, server: NodeServer, } @@ -89,6 +83,8 @@ impl Node { } } + let metadata_manager = MetadataManager::build(); + let admin_role = if options.roles.contains(Role::Admin) { Some(AdminRole::try_from(options.clone())?) } else { @@ -120,23 +116,9 @@ impl Node { }), ); - let admin_address = if let Some(admin_address) = options.admin_address { - if admin_role.is_some() { - warn!("This node is running the admin roles but has also a remote admin address configured. \ - This indicates a potential misconfiguration. Trying to connect to the remote admin."); - } - - admin_address - } else if admin_role.is_some() { - AdvertisedAddress::from_str(&format!("http://127.0.0.1:{}/", server.port())) - .expect("valid local address") - } else { - return Err(BuildError::UnknownClusterController); - }; - Ok(Node { options: opts, - admin_address, + metadata_manager, admin_role, worker_role, server, @@ -145,6 +127,17 @@ impl Node { pub async fn start(self) -> Result<(), anyhow::Error> { let tc = task_center(); + let metadata_writer = self.metadata_manager.writer(); + let metadata = self.metadata_manager.metadata(); + + // Start metadata manager + tc.spawn( + TaskKind::MetadataBackgroundSync, + "metadata-manager", + None, + self.metadata_manager.run(), + )?; + // If starting in bootstrap mode, we initialize the nodes configuration // with a static config. if self.options.bootstrap_cluster { @@ -171,12 +164,79 @@ impl Node { nodes_config.version(), self.options.cluster_name.clone(), ); + metadata_writer.update(nodes_config).await?; info!("Initial nodes configuration is loaded"); } else { + // TODO: Fetch nodes configuration from metadata store. + // // Not supported at the moment - unimplemented!() + bail!("Only cluster bootstrap mode is supported at the moment"); + } + + let nodes_config = metadata.nodes_config(); + // Find my node in nodes configuration. + let Some(current_config) = nodes_config.find_node_by_name(&self.options.node_name) else { + // Node is not in configuration. This is currently not supported. We only support + // static configuration. + bail!("Node is not in configuration. This is currently not supported."); + }; + + // Safety checks, same node (if set)? + if self + .options + .node_id + .is_some_and(|n| n != current_config.current_generation.as_plain()) + { + return Err(Error::SafetyCheck( + format!( + "Node ID mismatch: configured node ID is {}, but the nodes configuration contains {}", + self.options.node_id.unwrap(), + current_config.current_generation.as_plain() + )))?; + } + + // Same cluster? + if self.options.cluster_name != nodes_config.cluster_name() { + error!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + self.options.cluster_name, + nodes_config.cluster_name() + ); + return Err(Error::SafetyCheck( + format!( + "Cluster name mismatch: configured cluster name is '{}', but the nodes configuration contains '{}'", + self.options.cluster_name, + nodes_config.cluster_name() + )))?; } + // Setup node generation. + // Bump the last generation and update metadata store + let mut my_node_id = current_config.current_generation; + my_node_id.bump_generation(); + + let my_node_id: NodeId = my_node_id.into(); + // TODO: replace this temporary code with proper CAS write to metadata store + // Simulate a node configuration update and commit + { + let address = self.options.server.advertise_address.clone(); + + let my_node = NodeConfig::new( + self.options.node_name.clone(), + my_node_id.as_generational().unwrap(), + address, + self.options.roles, + ); + let mut editable_nodes_config: NodesConfiguration = nodes_config.deref().to_owned(); + editable_nodes_config.upsert_node(my_node); + editable_nodes_config.increment_version(); + // push new nodes config to local cache. + metadata_writer.update(editable_nodes_config).await?; + } + // My Node ID is set + MyNodeIdWriter::set_as_my_node_id(my_node_id); + info!("My Node ID is {}", my_node_id); + if let Some(admin_role) = self.admin_role { tc.spawn( TaskKind::SystemBoot, @@ -193,87 +253,15 @@ impl Node { self.server.run(), )?; - Self::attach_node(self.options, self.admin_address).await?; - if let Some(worker_role) = self.worker_role { - tc.spawn(TaskKind::SystemBoot, "worker-init", None, async { - // MyNodeId should be set here. - // Startup the worker role. - worker_role - .start( - NodeId::my_node_id() - .expect("my NodeId should be set after attaching to cluster"), - ) - .await?; - Ok(()) - })?; + tc.spawn( + TaskKind::SystemBoot, + "worker-init", + None, + worker_role.start(metadata), + )?; } Ok(()) } - - async fn attach_node(options: Options, admin_address: AdvertisedAddress) -> Result<(), Error> { - info!( - "Attaching '{}' (insist on ID?={:?}) to admin at '{admin_address}'", - options.node_name, options.node_id, - ); - - let channel = Self::create_channel_from_network_address(&admin_address) - .map_err(Error::InvalidClusterControllerAddress)?; - - let cc_client = ClusterControllerSvcClient::new(channel); - - let _response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, 10, None) - .retry_operation(|| async { - cc_client - .clone() - .attach_node(AttachmentRequest { - node_id: options.node_id.map(Into::into), - node_name: options.node_name.clone(), - }) - .await - }) - .await - .map_err(|err| Error::Attachment(admin_address, err))?; - - // todo: Generational NodeId should come from attachment result - let now = MillisSinceEpoch::now(); - let my_node_id: NodeId = options - .node_id - .unwrap_or(PlainNodeId::from(1)) - .with_generation(now.as_u64() as u32) - .into(); - // We are attached, we can set our own NodeId. - MyNodeIdWriter::set_as_my_node_id(my_node_id); - info!( - "Node attached to cluster controller. My Node ID is {}", - my_node_id - ); - Ok(()) - } - - fn create_channel_from_network_address( - cluster_controller_address: &AdvertisedAddress, - ) -> Result { - let channel = match cluster_controller_address { - AdvertisedAddress::Uds(uds_path) => { - let uds_path = uds_path.clone(); - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("/") - .expect("/ should be a valid Uri") - .connect_with_connector_lazy(service_fn(move |_: Uri| { - UnixStream::connect(uds_path.clone()) - })) - } - AdvertisedAddress::Http(uri) => Self::create_lazy_channel_from_uri(uri.clone()), - }; - Ok(channel) - } - - fn create_lazy_channel_from_uri(uri: Uri) -> Channel { - // todo: Make the channel settings configurable - Channel::builder(uri) - .connect_timeout(Duration::from_secs(5)) - .connect_lazy() - } } diff --git a/crates/node/src/metadata.rs b/crates/node/src/metadata.rs new file mode 100644 index 000000000..b3fb62caf --- /dev/null +++ b/crates/node/src/metadata.rs @@ -0,0 +1,409 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// todo: Remove after implementation is complete +#![allow(dead_code)] + +use std::sync::Arc; + +use arc_swap::ArcSwapOption; +use enum_map::EnumMap; +use restate_task_center::ShutdownError; +use tokio::sync::{oneshot, watch}; +use tracing::info; + +use restate_task_center::cancellation_watcher; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::MetadataKind; +use restate_types::Version; + +type CommandSender = tokio::sync::mpsc::UnboundedSender; +type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver; + +// todo +struct PartitionTable; + +/// Handle to access locally cached metadata, request metadata updates, and more. +#[derive(Clone)] +pub struct Metadata { + sender: CommandSender, + inner: Arc, +} + +pub enum MetadataContainer { + NodesConfiguration(NodesConfiguration), +} + +impl MetadataContainer { + pub fn kind(&self) -> MetadataKind { + match self { + MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration, + } + } +} + +impl From for MetadataContainer { + fn from(value: NodesConfiguration) -> Self { + MetadataContainer::NodesConfiguration(value) + } +} + +impl Metadata { + fn new(inner: Arc, sender: CommandSender) -> Self { + Self { inner, sender } + } + + /// Panics if nodes configuration is not loaded yet. + pub fn nodes_config(&self) -> Arc { + self.inner.nodes_config.load_full().unwrap() + } + + /// Returns Version::INVALID if nodes configuration has not been loaded yet. + pub fn nodes_config_version(&self) -> Version { + let c = self.inner.nodes_config.load(); + match c.as_deref() { + Some(c) => c.version(), + None => Version::INVALID, + } + } + + // Returns when the metadata kind is at the provided version (or newer) + pub async fn wait_for_version( + &self, + metadata_kind: MetadataKind, + min_version: Version, + ) -> Result { + let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); + let v = recv + .wait_for(|v| *v >= min_version) + .await + .map_err(|_| ShutdownError)?; + Ok(*v) + } + + // Returns when the metadata kind is at the provided version (or newer) + pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver { + self.inner.write_watches[metadata_kind].receive.clone() + } +} + +enum Command { + UpdateMetadata(MetadataContainer, Option>), +} + +#[derive(Default)] +struct MetadataInner { + nodes_config: ArcSwapOption, + write_watches: EnumMap, +} + +/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to +/// handle incoming metadata updates from the network, or to handle updates coming from metadata +/// service if it's running on this node. MetadataManager ensures that writes are monotonic +/// so it's safe to call update_* without checking the current version. +#[derive(Clone)] +pub struct MetadataWriter { + sender: CommandSender, +} + +impl MetadataWriter { + fn new(sender: CommandSender) -> Self { + Self { sender } + } + + // Returns when the nodes configuration update is performed. + pub async fn update(&self, value: impl Into) -> Result<(), ShutdownError> { + let (callback, recv) = oneshot::channel(); + let o = self + .sender + .send(Command::UpdateMetadata(value.into(), Some(callback))); + if o.is_ok() { + let _ = recv.await; + Ok(()) + } else { + Err(ShutdownError) + } + } + + // Fire and forget update + pub fn submit(&self, value: impl Into) { + // Ignore the error, task-center takes care of safely shutting down the + // system if metadata manager failed + let _ = self + .sender + .send(Command::UpdateMetadata(value.into(), None)); + } +} + +/// What is metadata manager? +/// +/// MetadataManager is a long-running task that monitors shared metadata needed by +/// services running on this node. It acts as the authority for updating the cached +/// metadata. It can also perform other tasks by running sub tasks as needed. +/// +/// Those include but not limited to: +/// - Syncing schema metadata, logs, nodes configuration with admin servers. +/// - Accepts adhoc requests from system components that might have observed higher +/// metadata version through other means. Metadata manager takes note and schedules a +/// sync so that we don't end up with thundering herd by direct metadata update +/// requests from components +/// +/// Metadata to be managed by MetadataManager: +/// - Bifrost's log metadata +/// - Schema metadata +/// - NodesConfiguration +/// - Partition table +pub struct MetadataManager { + self_sender: CommandSender, + inner: Arc, + inbound: CommandReceiver, +} + +impl MetadataManager { + pub fn build() -> Self { + let (self_sender, inbound) = tokio::sync::mpsc::unbounded_channel(); + + Self { + inner: Arc::new(MetadataInner::default()), + inbound, + self_sender, + } + } + + pub fn metadata(&self) -> Metadata { + Metadata::new(self.inner.clone(), self.self_sender.clone()) + } + + pub fn writer(&self) -> MetadataWriter { + MetadataWriter::new(self.self_sender.clone()) + } + + /// Start and wait for shutdown signal. + pub async fn run(mut self /*, network_sender: NetworkSender*/) -> anyhow::Result<()> { + info!("Metadata manager started"); + + loop { + tokio::select! { + biased; + _ = cancellation_watcher() => { + info!("Metadata manager stopped"); + break; + } + Some(cmd) = self.inbound.recv() => { + self.handle_command(cmd) + } + } + } + Ok(()) + } + + fn handle_command(&mut self, cmd: Command) { + match cmd { + Command::UpdateMetadata(value, callback) => self.update_metadata(value, callback), + } + } + + fn update_metadata(&mut self, value: MetadataContainer, callback: Option>) { + match value { + MetadataContainer::NodesConfiguration(config) => { + self.update_nodes_configuration(config, callback); + } + } + } + + fn update_nodes_configuration( + &mut self, + config: NodesConfiguration, + callback: Option>, + ) { + let inner = &self.inner; + let current = inner.nodes_config.load(); + let mut maybe_new_version = config.version(); + match current.as_deref() { + None => { + inner.nodes_config.store(Some(Arc::new(config))); + } + Some(current) if config.version() > current.version() => { + inner.nodes_config.store(Some(Arc::new(config))); + } + Some(current) => { + /* Do nothing, current is already newer */ + maybe_new_version = current.version(); + } + } + + if let Some(callback) = callback { + let _ = callback.send(()); + } + + // notify watches. + self.inner.write_watches[MetadataKind::NodesConfiguration] + .send + .send_if_modified(|v| { + if maybe_new_version > *v { + *v = maybe_new_version; + true + } else { + false + } + }); + } +} + +struct VersionWatch { + send: watch::Sender, + receive: watch::Receiver, +} + +impl Default for VersionWatch { + fn default() -> Self { + let (send, receive) = watch::channel(Version::INVALID); + Self { send, receive } + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + use std::sync::atomic::{AtomicBool, Ordering}; + + use super::*; + + use googletest::prelude::*; + use restate_task_center::{TaskCenterFactory, TaskKind}; + use restate_test_util::assert_eq; + use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role}; + use restate_types::GenerationalNodeId; + + #[tokio::test] + async fn test_nodes_config_updates() -> Result<()> { + let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); + let metadata_manager = MetadataManager::build(); + let metadata_writer = metadata_manager.writer(); + let metadata = metadata_manager.metadata(); + + assert_eq!(Version::INVALID, metadata.nodes_config_version()); + + let nodes_config = create_mock_nodes_config(); + assert_eq!(Version::MIN, nodes_config.version()); + // updates happening before metadata manager start should not get lost. + metadata_writer.submit(nodes_config.clone()); + + // start metadata manager + tc.spawn( + TaskKind::MetadataBackgroundSync, + "metadata-manager", + None, + metadata_manager.run(), + )?; + + let version = metadata + .wait_for_version(MetadataKind::NodesConfiguration, Version::MIN) + .await + .unwrap(); + assert_eq!(Version::MIN, version); + + // Wait should not block if waiting older version + let version2 = metadata + .wait_for_version(MetadataKind::NodesConfiguration, Version::INVALID) + .await + .unwrap(); + assert_eq!(version, version2); + + let updated = Arc::new(AtomicBool::new(false)); + tokio::spawn({ + let metadata = metadata.clone(); + let updated = Arc::clone(&updated); + async move { + let _ = metadata + .wait_for_version(MetadataKind::NodesConfiguration, Version::from(3)) + .await; + updated.store(true, Ordering::Release); + } + }); + + // let's bump the version a couple of times. + let mut nodes_config = nodes_config.clone(); + nodes_config.increment_version(); + nodes_config.increment_version(); + nodes_config.increment_version(); + nodes_config.increment_version(); + + metadata_writer.update(nodes_config).await?; + assert_eq!(true, updated.load(Ordering::Acquire)); + + tc.cancel_tasks(None, None).await; + Ok(()) + } + + #[tokio::test] + async fn test_watchers() -> Result<()> { + let tc = TaskCenterFactory::create(tokio::runtime::Handle::current()); + let metadata_manager = MetadataManager::build(); + let metadata_writer = metadata_manager.writer(); + let metadata = metadata_manager.metadata(); + + assert_eq!(Version::INVALID, metadata.nodes_config_version()); + + let nodes_config = create_mock_nodes_config(); + assert_eq!(Version::MIN, nodes_config.version()); + + // start metadata manager + tc.spawn( + TaskKind::MetadataBackgroundSync, + "metadata-manager", + None, + metadata_manager.run(), + )?; + + let mut watcher1 = metadata.watch(MetadataKind::NodesConfiguration); + assert_eq!(Version::INVALID, *watcher1.borrow()); + let mut watcher2 = metadata.watch(MetadataKind::NodesConfiguration); + assert_eq!(Version::INVALID, *watcher2.borrow()); + + metadata_writer.update(nodes_config.clone()).await?; + watcher1.changed().await?; + + assert_eq!(Version::MIN, *watcher1.borrow()); + assert_eq!(Version::MIN, *watcher2.borrow()); + + // let's push multiple updates + let mut nodes_config = nodes_config.clone(); + nodes_config.increment_version(); + metadata_writer.update(nodes_config.clone()).await?; + nodes_config.increment_version(); + metadata_writer.update(nodes_config.clone()).await?; + nodes_config.increment_version(); + metadata_writer.update(nodes_config.clone()).await?; + nodes_config.increment_version(); + metadata_writer.update(nodes_config.clone()).await?; + + // Watcher sees the latest value only. + watcher2.changed().await?; + assert_eq!(Version::from(5), *watcher2.borrow()); + assert!(!watcher2.has_changed().unwrap()); + + watcher1.changed().await?; + assert_eq!(Version::from(5), *watcher1.borrow()); + assert!(!watcher1.has_changed().unwrap()); + + Ok(()) + } + + fn create_mock_nodes_config() -> NodesConfiguration { + let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); + let address = AdvertisedAddress::from_str("http://127.0.0.1:5122/").unwrap(); + let node_id = GenerationalNodeId::new(1, 1); + let roles = Role::Admin | Role::Worker; + let my_node = NodeConfig::new("MyNode-1".to_owned(), node_id, address, roles); + nodes_config.upsert_node(my_node); + nodes_config + } +} diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 3f3505240..4d8168fdb 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -16,6 +16,9 @@ use tracing::debug; use tracing::subscriber::NoSubscriber; use restate_bifrost::{Bifrost, BifrostService}; +use restate_network::utils::create_grpc_channel_from_network_address; +use restate_node_services::cluster_controller::cluster_controller_svc_client::ClusterControllerSvcClient; +use restate_node_services::cluster_controller::AttachmentRequest; use restate_node_services::metadata::metadata_svc_client::MetadataSvcClient; use restate_node_services::metadata::FetchSchemasRequest; use restate_schema_api::subscription::SubscriptionResolver; @@ -24,10 +27,14 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_task_center::task_center; use restate_task_center::TaskKind; +use restate_types::nodes_config::AdvertisedAddress; +use restate_types::retries::RetryPolicy; use restate_types::NodeId; use restate_worker::{SubscriptionControllerHandle, Worker, WorkerCommandSender}; use restate_worker_api::SubscriptionController; +use tracing::info; +use crate::metadata::Metadata; use crate::Options; #[derive(Debug, thiserror::Error, CodedError)] @@ -47,6 +54,12 @@ pub enum WorkerRoleError { #[code] SchemaError, ), + #[error("invalid cluster controller address: {0}")] + #[code(unknown)] + InvalidClusterControllerAddress(http::Error), + #[error("failed to attach to cluster at '{0}': {1}")] + #[code(unknown)] + Attachment(AdvertisedAddress, tonic::Status), } #[derive(Debug, thiserror::Error, CodedError)] @@ -115,16 +128,22 @@ impl WorkerRole { Some(self.worker.subscription_controller_handle()) } - pub async fn start(self, my_node_id: NodeId) -> anyhow::Result<()> { + pub async fn start(self, metadata: Metadata) -> anyhow::Result<()> { // todo: only run subscriptions on node 0 once being distributed let subscription_controller = Some(self.worker.subscription_controller_handle()); // Ensures bifrost has initial metadata synced up before starting the worker. self.bifrost.start().await?; - // todo: make this configurable - let channel = - Channel::builder("http://127.0.0.1:5122/".parse().expect("valid uri")).connect_lazy(); + let admin_address = metadata + .nodes_config() + .get_admin_node() + .expect("at least one admin node") + .address + .clone(); + + let channel = create_grpc_channel_from_network_address(admin_address.clone()) + .expect("valid admin address"); let mut metadata_svc_client = MetadataSvcClient::new(channel); // Fetch latest schema information and fail if this is not possible @@ -143,13 +162,47 @@ impl WorkerRole { Self::reload_schemas(subscription_controller, self.schemas, metadata_svc_client), )?; - task_center().spawn_child( - TaskKind::RoleRunner, - "worker-service", - None, - self.worker.run(my_node_id), - )?; + task_center().spawn_child(TaskKind::RoleRunner, "worker-service", None, async { + Self::attach_node(admin_address).await?; + self.worker.run().await + })?; + + Ok(()) + } + async fn attach_node(admin_address: AdvertisedAddress) -> Result<(), WorkerRoleError> { + info!("Worker attaching to admin at '{admin_address}'"); + + let channel = create_grpc_channel_from_network_address(admin_address.clone()) + .map_err(WorkerRoleError::InvalidClusterControllerAddress)?; + + let cc_client = ClusterControllerSvcClient::new(channel); + + let _response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, 10, None) + .retry_operation(|| async { + cc_client + .clone() + .attach_node(AttachmentRequest { + node_id: NodeId::my_node_id().map(Into::into), + }) + .await + }) + .await + .map_err(|err| WorkerRoleError::Attachment(admin_address, err))?; + Ok(()) + } + + fn ignore_fetch_error(result: Result<(), SchemaError>) -> Result<(), SchemaError> { + if let Err(err) = result { + match err { + SchemaError::Fetch(err) => { + debug!("Failed fetching schema information: {err}. Retrying."); + } + SchemaError::Decode(_) | SchemaError::Update(_) | SchemaError::Subscription(_) => { + Err(err)? + } + } + } Ok(()) } @@ -180,20 +233,6 @@ impl WorkerRole { } } - fn ignore_fetch_error(result: Result<(), SchemaError>) -> Result<(), SchemaError> { - if let Err(err) = result { - match err { - SchemaError::Fetch(err) => { - debug!("Failed fetching schema information: {err}. Retrying."); - } - SchemaError::Decode(_) | SchemaError::Update(_) | SchemaError::Subscription(_) => { - Err(err)? - } - } - } - Ok(()) - } - async fn fetch_and_update_schemas( schemas: &Schemas, subscription_controller: Option<&SC>, @@ -209,12 +248,19 @@ impl WorkerRole { } async fn fetch_schemas( - metadata_svc_client: &mut MetadataSvcClient, + metadata_svc_client: &MetadataSvcClient, ) -> Result, SchemaError> { - let response = metadata_svc_client - // todo introduce schema version information to avoid fetching and overwriting the schema information - // over and over again - .fetch_schemas(FetchSchemasRequest {}) + let response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, 10, None) + .retry_operation(|| { + let mut metadata_svc_client = metadata_svc_client.clone(); + async move { + metadata_svc_client + // todo introduce schema version information to avoid fetching and overwriting the schema information + // over and over again + .fetch_schemas(FetchSchemasRequest {}) + .await + } + }) .await?; let (schema_updates, _) = bincode::serde::decode_from_slice::, _>( diff --git a/crates/node/src/server/handler/cluster_controller.rs b/crates/node/src/server/handler/cluster_controller.rs index b07941f74..c78e2e595 100644 --- a/crates/node/src/server/handler/cluster_controller.rs +++ b/crates/node/src/server/handler/cluster_controller.rs @@ -27,8 +27,8 @@ impl ClusterControllerSvc for ClusterControllerHandler { &self, request: Request, ) -> Result, Status> { - let node_name = request.into_inner().node_name; - debug!("Register node '{}'", node_name); + let node_id = request.into_inner().node_id.expect("node id must be set"); + debug!("Attaching node '{:?}'", node_id); Ok(Response::new(AttachmentResponse {})) } } diff --git a/crates/node/src/server/service.rs b/crates/node/src/server/service.rs index 223808c2e..fc834825c 100644 --- a/crates/node/src/server/service.rs +++ b/crates/node/src/server/service.rs @@ -18,16 +18,6 @@ use tracing::info; use restate_bifrost::Bifrost; use restate_cluster_controller::ClusterControllerHandle; use restate_meta::FileMetaReader; -use restate_storage_rocksdb::RocksDBStorage; -use restate_task_center::cancellation_watcher; - -use crate::server::handler; -use crate::server::handler::cluster_controller::ClusterControllerHandler; -use crate::server::handler::metadata::MetadataHandler; -use crate::server::handler::node_ctrl::NodeCtrlHandler; -use crate::server::handler::worker::WorkerHandler; -// TODO cleanup -use crate::server::metrics::install_global_prometheus_recorder; use restate_node_services::cluster_controller::cluster_controller_svc_server::ClusterControllerSvcServer; use restate_node_services::metadata::metadata_svc_server::MetadataSvcServer; use restate_node_services::node_ctrl::node_ctrl_svc_server::NodeCtrlSvcServer; @@ -35,8 +25,16 @@ use restate_node_services::worker::worker_svc_server::WorkerSvcServer; use restate_node_services::{cluster_controller, metadata, node_ctrl, worker}; use restate_schema_impl::Schemas; use restate_storage_query_datafusion::context::QueryContext; +use restate_storage_rocksdb::RocksDBStorage; +use restate_task_center::cancellation_watcher; use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; +use crate::server::handler; +use crate::server::handler::cluster_controller::ClusterControllerHandler; +use crate::server::handler::metadata::MetadataHandler; +use crate::server::handler::node_ctrl::NodeCtrlHandler; +use crate::server::handler::worker::WorkerHandler; +use crate::server::metrics::install_global_prometheus_recorder; use crate::server::multiplex::MultiplexService; use crate::server::options::Options; use crate::server::state::HandlerStateBuilder; diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 9de373103..081109a36 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -195,6 +195,10 @@ impl GenerationalNodeId { self.1 } + pub fn bump_generation(&mut self) { + self.1 += 1; + } + pub fn is_newer_than(self, other: GenerationalNodeId) -> bool { self.0 == other.0 && self.1 > other.1 } diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index b0e658507..03a3e57be 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -189,6 +189,14 @@ impl NodesConfiguration { let id = self.name_lookup.get(name.as_ref())?; self.find_node_by_id(*id).ok() } + + /// Returns _an_ admin node. + pub fn get_admin_node(&self) -> Option<&NodeConfig> { + self.nodes.values().find_map(|maybe| match maybe { + MaybeNode::Node(node) if node.roles.contains(Role::Admin) => Some(node), + _ => None, + }) + } } #[cfg(test)] diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index cc056a15f..ec433aae1 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -377,8 +377,9 @@ impl Worker { &self.rocksdb_storage } - pub async fn run(self, my_node_id: NodeId) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result<()> { let tc = task_center(); + let my_node_id = NodeId::my_node_id().expect("my node ID is set"); let shutdown = cancellation_watcher(); let (shutdown_signal, shutdown_watch) = drain::channel();