From 7a9272a641e314f7f073720e12c67c5329eb4032 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 22 Feb 2024 19:04:36 +0000 Subject: [PATCH] Move metadata to restate-core as described in the previous PR in the stack --- Cargo.lock | 2 + crates/core/Cargo.toml | 2 + crates/core/src/lib.rs | 6 +- .../src/metadata/manager.rs} | 154 ++--------------- crates/core/src/metadata/mod.rs | 158 ++++++++++++++++++ crates/node/src/lib.rs | 3 +- .../network_server/handler/cluster_ctrl.rs | 2 +- .../node/src/network_server/handler/node.rs | 2 +- crates/node/src/network_server/options.rs | 2 +- crates/node/src/network_server/service.rs | 2 +- crates/node/src/roles/worker.rs | 2 +- crates/types/src/lib.rs | 4 +- crates/types/src/{metadata.rs => version.rs} | 11 -- 13 files changed, 188 insertions(+), 162 deletions(-) rename crates/{node/src/metadata.rs => core/src/metadata/manager.rs} (68%) create mode 100644 crates/core/src/metadata/mod.rs rename crates/types/src/{metadata.rs => version.rs} (76%) diff --git a/Cargo.lock b/Cargo.lock index e7ea976fa..160b7362a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4846,7 +4846,9 @@ name = "restate-core" version = "0.8.0" dependencies = [ "anyhow", + "arc-swap", "derive_more", + "enum-map", "futures", "googletest", "restate-test-util", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 393c12f2d..2774f51c1 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -14,7 +14,9 @@ test-util = [] restate-types = { workspace = true } anyhow = { workspace = true } +arc-swap = { workspace = true } derive_more = { workspace = true } +enum-map = { workspace = true } futures = { workspace = true } static_assertions = { workspace = true } strum = { workspace = true } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 78093a3f7..372194f80 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -8,8 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod task_center_types; -pub use task_center_types::*; +pub mod metadata; mod task_center; +mod task_center_types; + pub use task_center::*; +pub use task_center_types::*; diff --git a/crates/node/src/metadata.rs b/crates/core/src/metadata/manager.rs similarity index 68% rename from crates/node/src/metadata.rs rename to crates/core/src/metadata/manager.rs index 02a8b56bd..5bafb633e 100644 --- a/crates/node/src/metadata.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,140 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -// todo: Remove after implementation is complete -#![allow(dead_code)] - use std::sync::Arc; -use arc_swap::ArcSwapOption; -use enum_map::EnumMap; -use restate_core::ShutdownError; -use tokio::sync::{oneshot, watch}; +use tokio::sync::oneshot; use tracing::info; -use restate_core::cancellation_watcher; +use crate::cancellation_watcher; use restate_types::nodes_config::NodesConfiguration; -use restate_types::MetadataKind; -use restate_types::Version; - -type CommandSender = tokio::sync::mpsc::UnboundedSender; -type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver; - -// todo -struct PartitionTable; - -/// Handle to access locally cached metadata, request metadata updates, and more. -#[derive(Clone)] -pub struct Metadata { - sender: CommandSender, - inner: Arc, -} - -pub enum MetadataContainer { - NodesConfiguration(NodesConfiguration), -} - -impl MetadataContainer { - pub fn kind(&self) -> MetadataKind { - match self { - MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration, - } - } -} -impl From for MetadataContainer { - fn from(value: NodesConfiguration) -> Self { - MetadataContainer::NodesConfiguration(value) - } -} - -impl Metadata { - fn new(inner: Arc, sender: CommandSender) -> Self { - Self { inner, sender } - } - - /// Panics if nodes configuration is not loaded yet. - pub fn nodes_config(&self) -> Arc { - self.inner.nodes_config.load_full().unwrap() - } - - /// Returns Version::INVALID if nodes configuration has not been loaded yet. - pub fn nodes_config_version(&self) -> Version { - let c = self.inner.nodes_config.load(); - match c.as_deref() { - Some(c) => c.version(), - None => Version::INVALID, - } - } - - // Returns when the metadata kind is at the provided version (or newer) - pub async fn wait_for_version( - &self, - metadata_kind: MetadataKind, - min_version: Version, - ) -> Result { - let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); - let v = recv - .wait_for(|v| *v >= min_version) - .await - .map_err(|_| ShutdownError)?; - Ok(*v) - } +use super::Metadata; +use super::MetadataContainer; +use super::MetadataInner; +use super::MetadataKind; +use super::MetadataWriter; - // Watch for version updates of this metadata kind. - pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver { - self.inner.write_watches[metadata_kind].receive.clone() - } -} +pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender; +pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver; -enum Command { +pub(super) enum Command { UpdateMetadata(MetadataContainer, Option>), } -#[derive(Default)] -struct MetadataInner { - nodes_config: ArcSwapOption, - write_watches: EnumMap, -} - -/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to -/// handle incoming metadata updates from the network, or to handle updates coming from metadata -/// service if it's running on this node. MetadataManager ensures that writes are monotonic -/// so it's safe to call update_* without checking the current version. -#[derive(Clone)] -pub struct MetadataWriter { - sender: CommandSender, -} - -impl MetadataWriter { - fn new(sender: CommandSender) -> Self { - Self { sender } - } - - // Returns when the nodes configuration update is performed. - pub async fn update(&self, value: impl Into) -> Result<(), ShutdownError> { - let (callback, recv) = oneshot::channel(); - let o = self - .sender - .send(Command::UpdateMetadata(value.into(), Some(callback))); - if o.is_ok() { - let _ = recv.await; - Ok(()) - } else { - Err(ShutdownError) - } - } - - // Fire and forget update - pub fn submit(&self, value: impl Into) { - // Ignore the error, task-center takes care of safely shutting down the - // system if metadata manager failed - let _ = self - .sender - .send(Command::UpdateMetadata(value.into(), None)); - } -} - +/// Handle to access locally cached metadata, request metadata updates, and more. /// What is metadata manager? /// /// MetadataManager is a long-running task that monitors shared metadata needed by @@ -257,21 +145,6 @@ impl MetadataManager { } } -struct VersionWatch { - sender: watch::Sender, - receive: watch::Receiver, -} - -impl Default for VersionWatch { - fn default() -> Self { - let (send, receive) = watch::channel(Version::INVALID); - Self { - sender: send, - receive, - } - } -} - #[cfg(test)] mod tests { use std::str::FromStr; @@ -280,10 +153,11 @@ mod tests { use super::*; use googletest::prelude::*; - use restate_core::{TaskCenterFactory, TaskKind}; use restate_test_util::assert_eq; use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role}; - use restate_types::GenerationalNodeId; + use restate_types::{GenerationalNodeId, Version}; + + use crate::{TaskCenterFactory, TaskKind}; #[tokio::test] async fn test_nodes_config_updates() -> Result<()> { diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs new file mode 100644 index 000000000..ba137e71e --- /dev/null +++ b/crates/core/src/metadata/mod.rs @@ -0,0 +1,158 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// todo: Remove after implementation is complete +#![allow(dead_code)] + +mod manager; +pub use manager::MetadataManager; + +use std::sync::Arc; + +use arc_swap::ArcSwapOption; +use enum_map::{Enum, EnumMap}; +use strum_macros::EnumIter; +use tokio::sync::{oneshot, watch}; + +use crate::ShutdownError; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::Version; + +/// The kind of versioned metadata that can be synchronized across nodes. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)] +pub enum MetadataKind { + NodesConfiguration, + Schema, + PartitionTable, + Logs, +} + +#[derive(Clone)] +pub struct Metadata { + sender: manager::CommandSender, + inner: Arc, +} + +pub enum MetadataContainer { + NodesConfiguration(NodesConfiguration), +} + +impl MetadataContainer { + pub fn kind(&self) -> MetadataKind { + match self { + MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration, + } + } +} + +impl From for MetadataContainer { + fn from(value: NodesConfiguration) -> Self { + MetadataContainer::NodesConfiguration(value) + } +} + +impl Metadata { + fn new(inner: Arc, sender: manager::CommandSender) -> Self { + Self { inner, sender } + } + + /// Panics if nodes configuration is not loaded yet. + pub fn nodes_config(&self) -> Arc { + self.inner.nodes_config.load_full().unwrap() + } + + /// Returns Version::INVALID if nodes configuration has not been loaded yet. + pub fn nodes_config_version(&self) -> Version { + let c = self.inner.nodes_config.load(); + match c.as_deref() { + Some(c) => c.version(), + None => Version::INVALID, + } + } + + // Returns when the metadata kind is at the provided version (or newer) + pub async fn wait_for_version( + &self, + metadata_kind: MetadataKind, + min_version: Version, + ) -> Result { + let mut recv = self.inner.write_watches[metadata_kind].receive.clone(); + let v = recv + .wait_for(|v| *v >= min_version) + .await + .map_err(|_| ShutdownError)?; + Ok(*v) + } + + // Watch for version updates of this metadata kind. + pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver { + self.inner.write_watches[metadata_kind].receive.clone() + } +} + +#[derive(Default)] +struct MetadataInner { + nodes_config: ArcSwapOption, + write_watches: EnumMap, +} + +/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to +/// handle incoming metadata updates from the network, or to handle updates coming from metadata +/// service if it's running on this node. MetadataManager ensures that writes are monotonic +/// so it's safe to call update_* without checking the current version. +#[derive(Clone)] +pub struct MetadataWriter { + sender: manager::CommandSender, +} + +impl MetadataWriter { + fn new(sender: manager::CommandSender) -> Self { + Self { sender } + } + + // Returns when the nodes configuration update is performed. + pub async fn update(&self, value: impl Into) -> Result<(), ShutdownError> { + let (callback, recv) = oneshot::channel(); + let o = self.sender.send(manager::Command::UpdateMetadata( + value.into(), + Some(callback), + )); + if o.is_ok() { + let _ = recv.await; + Ok(()) + } else { + Err(ShutdownError) + } + } + + // Fire and forget update + pub fn submit(&self, value: impl Into) { + // Ignore the error, task-center takes care of safely shutting down the + // system if metadata manager failed + let _ = self + .sender + .send(manager::Command::UpdateMetadata(value.into(), None)); + } +} + +struct VersionWatch { + sender: watch::Sender, + receive: watch::Receiver, +} + +impl Default for VersionWatch { + fn default() -> Self { + let (send, receive) = watch::channel(Version::INVALID); + Self { + sender: send, + receive, + } + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index a7e61f5cb..6c40229f1 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod metadata; mod network_server; mod options; mod roles; @@ -25,11 +24,11 @@ use anyhow::bail; use codederror::CodedError; use tracing::{error, info}; +use restate_core::metadata::MetadataManager; use restate_core::{task_center, TaskKind}; use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role}; use restate_types::{GenerationalNodeId, MyNodeIdWriter, NodeId, Version}; -use self::metadata::MetadataManager; use crate::network_server::{AdminDependencies, NetworkServer, WorkerDependencies}; use crate::roles::{AdminRole, WorkerRole}; diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 73eca0ce2..4b9b1cc99 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -11,12 +11,12 @@ use tonic::{async_trait, Request, Response, Status}; use tracing::debug; +use restate_core::metadata::Metadata; use restate_meta::MetaReader; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvc; use restate_node_services::cluster_ctrl::{AttachmentRequest, AttachmentResponse}; use restate_node_services::cluster_ctrl::{FetchSchemasRequest, FetchSchemasResponse}; -use crate::metadata::Metadata; use crate::network_server::AdminDependencies; pub struct ClusterCtrlSvcHandler { diff --git a/crates/node/src/network_server/handler/node.rs b/crates/node/src/network_server/handler/node.rs index d15eb69ef..7ab180913 100644 --- a/crates/node/src/network_server/handler/node.rs +++ b/crates/node/src/network_server/handler/node.rs @@ -12,6 +12,7 @@ use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::error::FlightError; use futures::stream::BoxStream; use futures::TryStreamExt; +use restate_core::metadata::{Metadata, MetadataWriter}; use restate_node_services::node::node_svc_server::NodeSvc; use restate_node_services::node::{IdentResponse, NodeStatus}; use restate_node_services::node::{ @@ -23,7 +24,6 @@ use restate_types::NodeId; use restate_worker_api::Handle; use tonic::{Request, Response, Status}; -use crate::metadata::{Metadata, MetadataWriter}; use crate::network_server::WorkerDependencies; pub struct NodeSvcHandler { diff --git a/crates/node/src/network_server/options.rs b/crates/node/src/network_server/options.rs index c4e482777..dfafa6a30 100644 --- a/crates/node/src/network_server/options.rs +++ b/crates/node/src/network_server/options.rs @@ -13,9 +13,9 @@ use std::str::FromStr; use serde_with::serde_as; +use restate_core::metadata::{Metadata, MetadataWriter}; use restate_types::nodes_config::AdvertisedAddress; -use crate::metadata::{Metadata, MetadataWriter}; use crate::network_server::service::{AdminDependencies, NetworkServer, WorkerDependencies}; /// # Node server options diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 41f026c9d..42fec89bd 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -17,6 +17,7 @@ use tracing::info; use restate_cluster_controller::ClusterControllerHandle; use restate_core::cancellation_watcher; +use restate_core::metadata::{Metadata, MetadataWriter}; use restate_meta::FileMetaReader; use restate_node_services::cluster_ctrl::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use restate_node_services::node::node_svc_server::NodeSvcServer; @@ -26,7 +27,6 @@ use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; -use crate::metadata::{Metadata, MetadataWriter}; use crate::network_server::handler; use crate::network_server::handler::cluster_ctrl::ClusterCtrlSvcHandler; use crate::network_server::handler::node::NodeSvcHandler; diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 48235f2ed..b09fedfd3 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -15,6 +15,7 @@ use tonic::transport::Channel; use tracing::debug; use tracing::subscriber::NoSubscriber; +use restate_core::metadata::Metadata; use restate_core::task_center; use restate_core::TaskKind; use restate_network::utils::create_grpc_channel_from_network_address; @@ -32,7 +33,6 @@ use restate_worker::{SubscriptionControllerHandle, Worker, WorkerCommandSender}; use restate_worker_api::SubscriptionController; use tracing::info; -use crate::metadata::Metadata; use crate::Options; #[derive(Debug, thiserror::Error, CodedError)] diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index e14a3bebc..c49fbbd5e 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -13,8 +13,8 @@ mod base62_util; mod id_util; mod macros; -mod metadata; mod node_id; +mod version; pub mod deployment; pub mod errors; @@ -31,5 +31,5 @@ pub mod time; pub mod timer; pub use id_util::{IdDecoder, IdEncoder, IdResourceType, IdStrCursor}; -pub use metadata::*; pub use node_id::*; +pub use version::*; diff --git a/crates/types/src/metadata.rs b/crates/types/src/version.rs similarity index 76% rename from crates/types/src/metadata.rs rename to crates/types/src/version.rs index 18e8f31bd..2f1ebebf8 100644 --- a/crates/types/src/metadata.rs +++ b/crates/types/src/version.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use enum_map::Enum; -use strum_macros::EnumIter; /// A type used for versioned metadata. #[derive( Debug, @@ -38,12 +36,3 @@ impl Default for Version { Self::MIN } } - -/// The kind of versioned metadata that can be synchronized across nodes. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)] -pub enum MetadataKind { - NodesConfiguration, - Schema, - PartitionTable, - Logs, -}