From 3d7c7184b37e2e6281793cbc14a681bf3a945091 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 27 Mar 2024 10:51:19 +0000 Subject: [PATCH] Bifrost provider configuration improvements pt1 - Rename and flattening of bifrost provider kinds and configurations - Provider configuration is typed (no more lazy config parsing errors) - Dedicated error type for provider errors --- Cargo.lock | 2 ++ crates/bifrost/Cargo.toml | 2 ++ crates/bifrost/src/bifrost.rs | 7 ++-- crates/bifrost/src/error.rs | 7 ++++ crates/bifrost/src/lib.rs | 2 +- crates/bifrost/src/loglet.rs | 32 +++++++---------- .../src/loglets/local_loglet/log_store.rs | 7 ++-- .../bifrost/src/loglets/local_loglet/mod.rs | 4 --- .../src/loglets/local_loglet/options.rs | 17 +++++++-- .../src/loglets/local_loglet/provider.rs | 20 +++++------ crates/bifrost/src/loglets/memory_loglet.rs | 14 +++----- crates/bifrost/src/options.rs | 35 +++++-------------- crates/bifrost/src/read_stream.rs | 2 +- server/src/main.rs | 4 +-- 14 files changed, 71 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 786dee125..ca3d96439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4862,6 +4862,7 @@ dependencies = [ "drain", "enum-map", "googletest", + "humantime", "once_cell", "restate-core", "restate-test-util", @@ -4870,6 +4871,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "serde_with", "smallvec", "static_assertions", "strum 0.26.2", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index e1e74e3e5..e3fd57fc7 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -27,11 +27,13 @@ derive_builder = { workspace = true } derive_more = { workspace = true } drain = { workspace = true } enum-map = { workspace = true, features = ["serde"] } +humantime = { workspace = true } once_cell = { workspace = true } rocksdb = { workspace = true, optional = true } schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } smallvec = { version = "1.13.2", features = ["serde"] } static_assertions = { workspace = true } strum = { workspace = true } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 61a4f255f..631804f3f 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -206,7 +206,8 @@ impl BifrostInner { fn provider_for(&self, kind: ProviderKind) -> &dyn LogletProvider { self.providers[kind] .get_or_init(|| { - let provider = crate::loglet::create_provider(kind, &self.opts); + let provider = crate::loglet::create_provider(kind, &self.opts) + .expect("provider is able to get created"); if let Err(e) = provider.start() { error!("Failed to start loglet provider {}: {}", kind, e); // todo: Handle provider errors by a graceful system shutdown @@ -364,7 +365,7 @@ mod tests { let memory_provider = MemoryLogletProvider::with_init_delay(delay); let bifrost_opts = Options { - default_provider: ProviderKind::Memory, + default_provider: ProviderKind::InMemory, ..Options::default() }; let bifrost_svc = bifrost_opts.build(num_partitions); @@ -373,7 +374,7 @@ mod tests { // Inject out preconfigured memory provider bifrost .inner() - .inject_provider(ProviderKind::Memory, memory_provider); + .inject_provider(ProviderKind::InMemory, memory_provider); // start bifrost service in the background bifrost_svc.start().await.unwrap(); diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index b56913a9e..ca8a0f8c1 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -33,3 +33,10 @@ pub enum Error { #[error(transparent)] LogStoreError(#[from] LogStoreError), } + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum ProviderError { + Shutdown(#[from] ShutdownError), + Other(#[from] anyhow::Error), +} diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index a1ebd1a95..872370786 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -22,7 +22,7 @@ mod watchdog; use std::collections::HashMap; pub use bifrost::Bifrost; -pub use error::Error; +pub use error::{Error, ProviderError}; pub use options::Options; pub use read_stream::LogReadStream; use restate_types::logs::LogId; diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index adbeeddac..9eef3029b 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -16,7 +16,7 @@ use enum_map::Enum; use restate_types::logs::{Lsn, Payload, SequenceNumber}; use crate::metadata::LogletParams; -use crate::{Error, LogRecord, LsnExt, Options}; +use crate::{Error, LogRecord, LsnExt, Options, ProviderError}; /// An enum with the list of supported loglet providers. /// For each variant we must have a corresponding implementation of the @@ -40,30 +40,24 @@ pub enum ProviderKind { /// A local rocksdb-backed loglet. Local, #[cfg(any(test, feature = "memory_loglet"))] - Memory, -} - -pub fn provider_default_config(kind: ProviderKind) -> serde_json::Value { - match kind { - #[cfg(any(test, feature = "local_loglet"))] - ProviderKind::Local => crate::loglets::local_loglet::default_config(), - #[cfg(any(test, feature = "memory_loglet"))] - ProviderKind::Memory => crate::loglets::memory_loglet::default_config(), - } + /// An in-memory loglet, primarily for testing. + InMemory, } // why? because if all loglet features are disabled, clippy will complain about options being // unused. #[allow(unused_variables)] -pub fn create_provider(kind: ProviderKind, options: &Options) -> Arc { +pub fn create_provider( + kind: ProviderKind, + options: &Options, +) -> Result, ProviderError> { match kind { #[cfg(any(test, feature = "local_loglet"))] - ProviderKind::Local => crate::loglets::local_loglet::LocalLogletProvider::new( - &options.local_loglet_storage_path(), - &options.providers_config[kind], - ), + ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new( + options.local.clone(), + )?), #[cfg(any(test, feature = "memory_loglet"))] - ProviderKind::Memory => crate::loglets::memory_loglet::MemoryLogletProvider::new(), + ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?), } } @@ -106,10 +100,10 @@ pub trait LogletProvider: Send + Sync { async fn get_loglet(&self, params: &LogletParams) -> Result, Error>; // Hook for handling lazy initialization - fn start(&self) -> Result<(), Error>; + fn start(&self) -> Result<(), ProviderError>; // Hook for handling graceful shutdown - async fn shutdown(&self) -> Result<(), Error> { + async fn shutdown(&self) -> Result<(), ProviderError> { Ok(()) } } diff --git a/crates/bifrost/src/loglets/local_loglet/log_store.rs b/crates/bifrost/src/loglets/local_loglet/log_store.rs index dba36f731..5800d62d8 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store.rs @@ -8,11 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::path::Path; use std::sync::Arc; use std::time::Instant; -use anyhow::Context; use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DB}; use tracing::{debug, warn}; @@ -42,7 +40,7 @@ pub struct RocksDbLogStore { } impl RocksDbLogStore { - pub fn new(storage_path: &Path, options: &Options) -> anyhow::Result { + pub fn new(options: &Options) -> Result { let cache = if options.rocksdb_cache_size > 0 { Some(Cache::new_lru_cache(options.rocksdb_cache_size)) } else { @@ -68,8 +66,7 @@ impl RocksDbLogStore { ]; let db_options = db_options(options); - let db = DB::open_cf_descriptors(&db_options, storage_path, cfs) - .context("failed to open rocksdb for local loglet store")?; + let db = DB::open_cf_descriptors(&db_options, &options.path, cfs)?; Ok(Self { db: Arc::new(db) }) } diff --git a/crates/bifrost/src/loglets/local_loglet/mod.rs b/crates/bifrost/src/loglets/local_loglet/mod.rs index f4d1b6708..55b465286 100644 --- a/crates/bifrost/src/loglets/local_loglet/mod.rs +++ b/crates/bifrost/src/loglets/local_loglet/mod.rs @@ -36,10 +36,6 @@ use self::log_store::RocksDbLogStore; use self::log_store_writer::RocksDbLogWriterHandle; use self::utils::OffsetWatch; -pub fn default_config() -> serde_json::Value { - serde_json::to_value(Options::default()).expect("default config to be serializable") -} - #[derive(Debug)] pub struct LocalLoglet { log_id: u64, diff --git a/crates/bifrost/src/loglets/local_loglet/options.rs b/crates/bifrost/src/loglets/local_loglet/options.rs index 86d0079e1..c49dfe2d3 100644 --- a/crates/bifrost/src/loglets/local_loglet/options.rs +++ b/crates/bifrost/src/loglets/local_loglet/options.rs @@ -8,12 +8,20 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::path::{Path, PathBuf}; use std::time::Duration; +use restate_types::DEFAULT_STORAGE_DIRECTORY; use serde::{Deserialize, Serialize}; +use serde_with::serde_as; -#[derive(Debug, Serialize, Deserialize)] +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "options_schema", schemars(rename = "LocalLoglet", default))] +#[builder(default)] pub struct Options { + pub path: PathBuf, pub rocksdb_threads: usize, pub rocksdb_disable_statistics: bool, pub rocksdb_disable_wal: bool, @@ -24,7 +32,9 @@ pub struct Options { /// write batch on every command. pub writer_commit_batch_size_threshold: usize, /// Trigger a commit when the time since the last commit exceeds this threshold. - pub writer_commit_time_interval: Duration, + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "options_schema", schemars(with = "String"))] + pub writer_commit_time_interval: humantime::Duration, /// The maximum number of write commands that can be queued. pub writer_queue_len: usize, /// If true, rocksdb flushes follow writing record batches, otherwise, we @@ -35,6 +45,7 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { + path: Path::new(DEFAULT_STORAGE_DIRECTORY).join("local_loglet"), rocksdb_threads: 10, // todo: enable when we have a way to expose the statistics through node-ctrl rocksdb_disable_statistics: true, @@ -43,7 +54,7 @@ impl Default for Options { rocksdb_max_total_wal_size: 2 * (1 << 30), // 2 GiB rocksdb_write_buffer_size: 0, writer_commit_batch_size_threshold: 200, - writer_commit_time_interval: Duration::from_millis(13), + writer_commit_time_interval: Duration::from_millis(13).into(), writer_queue_len: 200, flush_wal_on_commit: true, } diff --git a/crates/bifrost/src/loglets/local_loglet/provider.rs b/crates/bifrost/src/loglets/local_loglet/provider.rs index 0e1a58c44..9a876813a 100644 --- a/crates/bifrost/src/loglets/local_loglet/provider.rs +++ b/crates/bifrost/src/loglets/local_loglet/provider.rs @@ -9,9 +9,9 @@ // by the Apache License, Version 2.0. use std::collections::{hash_map, HashMap}; -use std::path::Path; use std::sync::{Arc, OnceLock}; +use anyhow::Context; use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tracing::debug; @@ -21,6 +21,7 @@ use super::log_store_writer::RocksDbLogWriterHandle; use super::{LocalLoglet, Options}; use crate::loglet::{Loglet, LogletOffset, LogletProvider}; use crate::loglets::local_loglet::log_store_writer::WriterOptions; +use crate::ProviderError; use crate::{Error, LogletParams}; #[derive(Debug)] @@ -32,18 +33,15 @@ pub struct LocalLogletProvider { } impl LocalLogletProvider { - pub fn new(storage_path: &Path, raw_options: &serde_json::Value) -> Arc { - let opts = - serde_json::from_value(raw_options.clone()).expect("to be able to deserialize options"); - // todo: implement loglet loading error handling - let log_store = RocksDbLogStore::new(storage_path, &opts).expect("bifrost local loglet"); + pub fn new(opts: Options) -> Result, ProviderError> { + let log_store = RocksDbLogStore::new(&opts).context("RockDb LogStore")?; - Arc::new(Self { + Ok(Arc::new(Self { log_store, active_loglets: Default::default(), log_writer: OnceLock::new(), opts, - }) + })) } } @@ -86,10 +84,10 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } - fn start(&self) -> Result<(), Error> { + fn start(&self) -> Result<(), ProviderError> { let writer_options = WriterOptions { channel_size: self.opts.writer_queue_len, - commit_time_interval: self.opts.writer_commit_time_interval, + commit_time_interval: self.opts.writer_commit_time_interval.into(), batch_size_threshold: self.opts.writer_queue_len, flush_wal_on_commit: self.opts.flush_wal_on_commit, disable_wal: self.opts.rocksdb_disable_wal, @@ -103,7 +101,7 @@ impl LogletProvider for LocalLogletProvider { Ok(()) } - async fn shutdown(&self) -> Result<(), Error> { + async fn shutdown(&self) -> Result<(), ProviderError> { debug!("Shutting down local loglet provider"); self.log_store.shutdown(); Ok(()) diff --git a/crates/bifrost/src/loglets/memory_loglet.rs b/crates/bifrost/src/loglets/memory_loglet.rs index dcf482059..41d471705 100644 --- a/crates/bifrost/src/loglets/memory_loglet.rs +++ b/crates/bifrost/src/loglets/memory_loglet.rs @@ -22,12 +22,8 @@ use tracing::{debug, info}; use crate::loglet::{Loglet, LogletBase, LogletOffset, LogletProvider}; use crate::metadata::LogletParams; -use crate::Error; use crate::LogRecord; - -pub fn default_config() -> serde_json::Value { - serde_json::Value::Null -} +use crate::{Error, ProviderError}; #[derive(Default)] pub struct MemoryLogletProvider { @@ -37,8 +33,8 @@ pub struct MemoryLogletProvider { #[allow(dead_code)] impl MemoryLogletProvider { - pub fn new() -> Arc { - Arc::default() + pub fn new() -> Result, ProviderError> { + Ok(Arc::default()) } pub fn with_init_delay(init_delay: Duration) -> Arc { @@ -75,12 +71,12 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } - fn start(&self) -> Result<(), Error> { + fn start(&self) -> Result<(), ProviderError> { info!("Starting in-memory loglet provider"); Ok(()) } - async fn shutdown(&self) -> Result<(), Error> { + async fn shutdown(&self) -> Result<(), ProviderError> { info!("Shutting down in-memory loglet provider"); Ok(()) } diff --git a/crates/bifrost/src/options.rs b/crates/bifrost/src/options.rs index 112fe9f10..da952a88f 100644 --- a/crates/bifrost/src/options.rs +++ b/crates/bifrost/src/options.rs @@ -8,13 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::path::{Path, PathBuf}; - -use enum_map::EnumMap; -use restate_types::DEFAULT_STORAGE_DIRECTORY; -use strum::IntoEnumIterator; - -use crate::loglet::{provider_default_config, ProviderKind}; +use crate::loglet::ProviderKind; +use crate::loglets::local_loglet; use crate::service::BifrostService; /// # Bifrost options @@ -29,44 +24,32 @@ pub struct Options { /// # The default kind of loglet to be used #[cfg_attr(feature = "options_schema", schemars(with = "String"))] pub default_provider: ProviderKind, - // todo: Swap serde_json with extract-able figment + #[cfg(any(test, feature = "local_loglet"))] #[cfg_attr(feature = "options_schema", schemars(with = "String"))] - pub providers_config: EnumMap, + /// Configuration of local loglet provider + pub local: local_loglet::Options, } impl Default for Options { fn default() -> Self { - let mut providers_config = EnumMap::default(); - for kind in ProviderKind::iter() { - providers_config[kind] = provider_default_config(kind); - } - Self { default_provider: ProviderKind::Local, - providers_config, + #[cfg(any(test, feature = "local_loglet"))] + local: local_loglet::Options::default(), } } } impl Options { pub fn build(self, num_partitions: u64) -> BifrostService { - // todo: validate that options are parseable by the configured loglet provider. BifrostService::new(self, num_partitions) } - pub fn local_loglet_storage_path(&self) -> PathBuf { - Path::new(DEFAULT_STORAGE_DIRECTORY).join("local_loglet") - } - #[cfg(any(test, feature = "memory_loglet"))] pub fn memory() -> Self { - let mut providers_config = EnumMap::default(); - let kind = ProviderKind::Memory; - providers_config[kind] = provider_default_config(kind); - Self { - default_provider: ProviderKind::Memory, - providers_config, + default_provider: ProviderKind::InMemory, + ..Default::default() } } } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 1b9d7ebac..11ecc94cf 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -103,7 +103,7 @@ mod tests { let read_after = Lsn::from(5); let bifrost_opts = Options { - default_provider: ProviderKind::Memory, + default_provider: ProviderKind::InMemory, ..Options::default() }; let bifrost_svc = bifrost_opts.build(num_partitions); diff --git a/server/src/main.rs b/server/src/main.rs index cc2a9a1c7..9ac9949e3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -73,7 +73,7 @@ impl WipeMode { mode: Option<&WipeMode>, meta_storage_dir: PathBuf, worker_storage_dir: PathBuf, - local_loglet_storage_dir: PathBuf, + local_loglet_storage_dir: &Path, local_metadata_store_storage_dir: &Path, ) -> io::Result<()> { let (wipe_meta, wipe_worker, wipe_local_loglet, wipe_local_metadata_store) = match mode { @@ -160,7 +160,7 @@ fn main() { cli_args.wipe.as_ref(), config.node.meta.storage_path().into(), config.node.worker.storage_path().into(), - config.node.bifrost.local_loglet_storage_path(), + config.node.bifrost.local.path.as_path(), config.node.metadata_store.storage_path(), ) .await