Skip to content

Commit

Permalink
Move metadata to restate-core
Browse files Browse the repository at this point in the history
as described in the previous PR in the stack
  • Loading branch information
AhmedSoliman committed Feb 23, 2024
1 parent 9fe8bff commit 7a9272a
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 162 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ test-util = []
restate-types = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod task_center_types;
pub use task_center_types::*;
pub mod metadata;

mod task_center;
mod task_center_types;

pub use task_center::*;
pub use task_center_types::*;
154 changes: 14 additions & 140 deletions crates/node/src/metadata.rs → crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,140 +8,28 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo: Remove after implementation is complete
#![allow(dead_code)]

use std::sync::Arc;

use arc_swap::ArcSwapOption;
use enum_map::EnumMap;
use restate_core::ShutdownError;
use tokio::sync::{oneshot, watch};
use tokio::sync::oneshot;
use tracing::info;

use restate_core::cancellation_watcher;
use crate::cancellation_watcher;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::MetadataKind;
use restate_types::Version;

type CommandSender = tokio::sync::mpsc::UnboundedSender<Command>;
type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver<Command>;

// todo
struct PartitionTable;

/// Handle to access locally cached metadata, request metadata updates, and more.
#[derive(Clone)]
pub struct Metadata {
sender: CommandSender,
inner: Arc<MetadataInner>,
}

pub enum MetadataContainer {
NodesConfiguration(NodesConfiguration),
}

impl MetadataContainer {
pub fn kind(&self) -> MetadataKind {
match self {
MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration,
}
}
}

impl From<NodesConfiguration> for MetadataContainer {
fn from(value: NodesConfiguration) -> Self {
MetadataContainer::NodesConfiguration(value)
}
}

impl Metadata {
fn new(inner: Arc<MetadataInner>, sender: CommandSender) -> Self {
Self { inner, sender }
}

/// Panics if nodes configuration is not loaded yet.
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full().unwrap()
}

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
let c = self.inner.nodes_config.load();
match c.as_deref() {
Some(c) => c.version(),
None => Version::INVALID,
}
}

// Returns when the metadata kind is at the provided version (or newer)
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
min_version: Version,
) -> Result<Version, ShutdownError> {
let mut recv = self.inner.write_watches[metadata_kind].receive.clone();
let v = recv
.wait_for(|v| *v >= min_version)
.await
.map_err(|_| ShutdownError)?;
Ok(*v)
}
use super::Metadata;
use super::MetadataContainer;
use super::MetadataInner;
use super::MetadataKind;
use super::MetadataWriter;

// Watch for version updates of this metadata kind.
pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver<Version> {
self.inner.write_watches[metadata_kind].receive.clone()
}
}
pub(super) type CommandSender = tokio::sync::mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = tokio::sync::mpsc::UnboundedReceiver<Command>;

enum Command {
pub(super) enum Command {
UpdateMetadata(MetadataContainer, Option<oneshot::Sender<()>>),
}

#[derive(Default)]
struct MetadataInner {
nodes_config: ArcSwapOption<NodesConfiguration>,
write_watches: EnumMap<MetadataKind, VersionWatch>,
}

/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to
/// handle incoming metadata updates from the network, or to handle updates coming from metadata
/// service if it's running on this node. MetadataManager ensures that writes are monotonic
/// so it's safe to call update_* without checking the current version.
#[derive(Clone)]
pub struct MetadataWriter {
sender: CommandSender,
}

impl MetadataWriter {
fn new(sender: CommandSender) -> Self {
Self { sender }
}

// Returns when the nodes configuration update is performed.
pub async fn update(&self, value: impl Into<MetadataContainer>) -> Result<(), ShutdownError> {
let (callback, recv) = oneshot::channel();
let o = self
.sender
.send(Command::UpdateMetadata(value.into(), Some(callback)));
if o.is_ok() {
let _ = recv.await;
Ok(())
} else {
Err(ShutdownError)
}
}

// Fire and forget update
pub fn submit(&self, value: impl Into<MetadataContainer>) {
// Ignore the error, task-center takes care of safely shutting down the
// system if metadata manager failed
let _ = self
.sender
.send(Command::UpdateMetadata(value.into(), None));
}
}

/// Handle to access locally cached metadata, request metadata updates, and more.
/// What is metadata manager?
///
/// MetadataManager is a long-running task that monitors shared metadata needed by
Expand Down Expand Up @@ -257,21 +145,6 @@ impl MetadataManager {
}
}

struct VersionWatch {
sender: watch::Sender<Version>,
receive: watch::Receiver<Version>,
}

impl Default for VersionWatch {
fn default() -> Self {
let (send, receive) = watch::channel(Version::INVALID);
Self {
sender: send,
receive,
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand All @@ -280,10 +153,11 @@ mod tests {
use super::*;

use googletest::prelude::*;
use restate_core::{TaskCenterFactory, TaskKind};
use restate_test_util::assert_eq;
use restate_types::nodes_config::{AdvertisedAddress, NodeConfig, Role};
use restate_types::GenerationalNodeId;
use restate_types::{GenerationalNodeId, Version};

use crate::{TaskCenterFactory, TaskKind};

#[tokio::test]
async fn test_nodes_config_updates() -> Result<()> {
Expand Down
158 changes: 158 additions & 0 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo: Remove after implementation is complete
#![allow(dead_code)]

mod manager;
pub use manager::MetadataManager;

use std::sync::Arc;

use arc_swap::ArcSwapOption;
use enum_map::{Enum, EnumMap};
use strum_macros::EnumIter;
use tokio::sync::{oneshot, watch};

use crate::ShutdownError;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::Version;

/// The kind of versioned metadata that can be synchronized across nodes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Enum, EnumIter)]
pub enum MetadataKind {
NodesConfiguration,
Schema,
PartitionTable,
Logs,
}

#[derive(Clone)]
pub struct Metadata {
sender: manager::CommandSender,
inner: Arc<MetadataInner>,
}

pub enum MetadataContainer {
NodesConfiguration(NodesConfiguration),
}

impl MetadataContainer {
pub fn kind(&self) -> MetadataKind {
match self {
MetadataContainer::NodesConfiguration(_) => MetadataKind::NodesConfiguration,
}
}
}

impl From<NodesConfiguration> for MetadataContainer {
fn from(value: NodesConfiguration) -> Self {
MetadataContainer::NodesConfiguration(value)
}
}

impl Metadata {
fn new(inner: Arc<MetadataInner>, sender: manager::CommandSender) -> Self {
Self { inner, sender }
}

/// Panics if nodes configuration is not loaded yet.
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full().unwrap()
}

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
let c = self.inner.nodes_config.load();
match c.as_deref() {
Some(c) => c.version(),
None => Version::INVALID,
}
}

// Returns when the metadata kind is at the provided version (or newer)
pub async fn wait_for_version(
&self,
metadata_kind: MetadataKind,
min_version: Version,
) -> Result<Version, ShutdownError> {
let mut recv = self.inner.write_watches[metadata_kind].receive.clone();
let v = recv
.wait_for(|v| *v >= min_version)
.await
.map_err(|_| ShutdownError)?;
Ok(*v)
}

// Watch for version updates of this metadata kind.
pub fn watch(&self, metadata_kind: MetadataKind) -> watch::Receiver<Version> {
self.inner.write_watches[metadata_kind].receive.clone()
}
}

#[derive(Default)]
struct MetadataInner {
nodes_config: ArcSwapOption<NodesConfiguration>,
write_watches: EnumMap<MetadataKind, VersionWatch>,
}

/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to
/// handle incoming metadata updates from the network, or to handle updates coming from metadata
/// service if it's running on this node. MetadataManager ensures that writes are monotonic
/// so it's safe to call update_* without checking the current version.
#[derive(Clone)]
pub struct MetadataWriter {
sender: manager::CommandSender,
}

impl MetadataWriter {
fn new(sender: manager::CommandSender) -> Self {
Self { sender }
}

// Returns when the nodes configuration update is performed.
pub async fn update(&self, value: impl Into<MetadataContainer>) -> Result<(), ShutdownError> {
let (callback, recv) = oneshot::channel();
let o = self.sender.send(manager::Command::UpdateMetadata(
value.into(),
Some(callback),
));
if o.is_ok() {
let _ = recv.await;
Ok(())
} else {
Err(ShutdownError)
}
}

// Fire and forget update
pub fn submit(&self, value: impl Into<MetadataContainer>) {
// Ignore the error, task-center takes care of safely shutting down the
// system if metadata manager failed
let _ = self
.sender
.send(manager::Command::UpdateMetadata(value.into(), None));
}
}

struct VersionWatch {
sender: watch::Sender<Version>,
receive: watch::Receiver<Version>,
}

impl Default for VersionWatch {
fn default() -> Self {
let (send, receive) = watch::channel(Version::INVALID);
Self {
sender: send,
receive,
}
}
}
Loading

0 comments on commit 7a9272a

Please sign in to comment.