Skip to content

Commit

Permalink
Bifrost provider configuration improvements pt1
Browse files Browse the repository at this point in the history
- Rename and flattening of bifrost provider kinds and configurations
- Provider configuration is typed (no more lazy config parsing errors)
- Dedicated error type for provider errors
  • Loading branch information
AhmedSoliman committed Mar 27, 2024
1 parent 4cafbba commit 3d7c718
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 84 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ derive_builder = { workspace = true }
derive_more = { workspace = true }
drain = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
humantime = { workspace = true }
once_cell = { workspace = true }
rocksdb = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { version = "1.13.2", features = ["serde"] }
static_assertions = { workspace = true }
strum = { workspace = true }
Expand Down
7 changes: 4 additions & 3 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ impl BifrostInner {
fn provider_for(&self, kind: ProviderKind) -> &dyn LogletProvider {
self.providers[kind]
.get_or_init(|| {
let provider = crate::loglet::create_provider(kind, &self.opts);
let provider = crate::loglet::create_provider(kind, &self.opts)
.expect("provider is able to get created");
if let Err(e) = provider.start() {
error!("Failed to start loglet provider {}: {}", kind, e);
// todo: Handle provider errors by a graceful system shutdown
Expand Down Expand Up @@ -364,7 +365,7 @@ mod tests {
let memory_provider = MemoryLogletProvider::with_init_delay(delay);

let bifrost_opts = Options {
default_provider: ProviderKind::Memory,
default_provider: ProviderKind::InMemory,
..Options::default()
};
let bifrost_svc = bifrost_opts.build(num_partitions);
Expand All @@ -373,7 +374,7 @@ mod tests {
// Inject out preconfigured memory provider
bifrost
.inner()
.inject_provider(ProviderKind::Memory, memory_provider);
.inject_provider(ProviderKind::InMemory, memory_provider);

// start bifrost service in the background
bifrost_svc.start().await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ pub enum Error {
#[error(transparent)]
LogStoreError(#[from] LogStoreError),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
}
2 changes: 1 addition & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod watchdog;
use std::collections::HashMap;

pub use bifrost::Bifrost;
pub use error::Error;
pub use error::{Error, ProviderError};
pub use options::Options;
pub use read_stream::LogReadStream;
use restate_types::logs::LogId;
Expand Down
32 changes: 13 additions & 19 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use enum_map::Enum;
use restate_types::logs::{Lsn, Payload, SequenceNumber};

use crate::metadata::LogletParams;
use crate::{Error, LogRecord, LsnExt, Options};
use crate::{Error, LogRecord, LsnExt, Options, ProviderError};

/// An enum with the list of supported loglet providers.
/// For each variant we must have a corresponding implementation of the
Expand All @@ -40,30 +40,24 @@ pub enum ProviderKind {
/// A local rocksdb-backed loglet.
Local,
#[cfg(any(test, feature = "memory_loglet"))]
Memory,
}

pub fn provider_default_config(kind: ProviderKind) -> serde_json::Value {
match kind {
#[cfg(any(test, feature = "local_loglet"))]
ProviderKind::Local => crate::loglets::local_loglet::default_config(),
#[cfg(any(test, feature = "memory_loglet"))]
ProviderKind::Memory => crate::loglets::memory_loglet::default_config(),
}
/// An in-memory loglet, primarily for testing.
InMemory,
}

// why? because if all loglet features are disabled, clippy will complain about options being
// unused.
#[allow(unused_variables)]
pub fn create_provider(kind: ProviderKind, options: &Options) -> Arc<dyn LogletProvider> {
pub fn create_provider(
kind: ProviderKind,
options: &Options,
) -> Result<Arc<dyn LogletProvider>, ProviderError> {
match kind {
#[cfg(any(test, feature = "local_loglet"))]
ProviderKind::Local => crate::loglets::local_loglet::LocalLogletProvider::new(
&options.local_loglet_storage_path(),
&options.providers_config[kind],
),
ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new(
options.local.clone(),
)?),
#[cfg(any(test, feature = "memory_loglet"))]
ProviderKind::Memory => crate::loglets::memory_loglet::MemoryLogletProvider::new(),
ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?),
}
}

Expand Down Expand Up @@ -106,10 +100,10 @@ pub trait LogletProvider: Send + Sync {
async fn get_loglet(&self, params: &LogletParams) -> Result<Arc<dyn Loglet>, Error>;

// Hook for handling lazy initialization
fn start(&self) -> Result<(), Error>;
fn start(&self) -> Result<(), ProviderError>;

// Hook for handling graceful shutdown
async fn shutdown(&self) -> Result<(), Error> {
async fn shutdown(&self) -> Result<(), ProviderError> {
Ok(())
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::Path;
use std::sync::Arc;
use std::time::Instant;

use anyhow::Context;
use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, DB};
use tracing::{debug, warn};

Expand Down Expand Up @@ -42,7 +40,7 @@ pub struct RocksDbLogStore {
}

impl RocksDbLogStore {
pub fn new(storage_path: &Path, options: &Options) -> anyhow::Result<Self> {
pub fn new(options: &Options) -> Result<Self, LogStoreError> {
let cache = if options.rocksdb_cache_size > 0 {
Some(Cache::new_lru_cache(options.rocksdb_cache_size))
} else {
Expand All @@ -68,8 +66,7 @@ impl RocksDbLogStore {
];
let db_options = db_options(options);

let db = DB::open_cf_descriptors(&db_options, storage_path, cfs)
.context("failed to open rocksdb for local loglet store")?;
let db = DB::open_cf_descriptors(&db_options, &options.path, cfs)?;

Ok(Self { db: Arc::new(db) })
}
Expand Down
4 changes: 0 additions & 4 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ use self::log_store::RocksDbLogStore;
use self::log_store_writer::RocksDbLogWriterHandle;
use self::utils::OffsetWatch;

pub fn default_config() -> serde_json::Value {
serde_json::to_value(Options::default()).expect("default config to be serializable")
}

#[derive(Debug)]
pub struct LocalLoglet {
log_id: u64,
Expand Down
17 changes: 14 additions & 3 deletions crates/bifrost/src/loglets/local_loglet/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::{Path, PathBuf};
use std::time::Duration;

use restate_types::DEFAULT_STORAGE_DIRECTORY;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

#[derive(Debug, Serialize, Deserialize)]
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "options_schema", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "options_schema", schemars(rename = "LocalLoglet", default))]
#[builder(default)]
pub struct Options {
pub path: PathBuf,
pub rocksdb_threads: usize,
pub rocksdb_disable_statistics: bool,
pub rocksdb_disable_wal: bool,
Expand All @@ -24,7 +32,9 @@ pub struct Options {
/// write batch on every command.
pub writer_commit_batch_size_threshold: usize,
/// Trigger a commit when the time since the last commit exceeds this threshold.
pub writer_commit_time_interval: Duration,
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "options_schema", schemars(with = "String"))]
pub writer_commit_time_interval: humantime::Duration,
/// The maximum number of write commands that can be queued.
pub writer_queue_len: usize,
/// If true, rocksdb flushes follow writing record batches, otherwise, we
Expand All @@ -35,6 +45,7 @@ pub struct Options {
impl Default for Options {
fn default() -> Self {
Self {
path: Path::new(DEFAULT_STORAGE_DIRECTORY).join("local_loglet"),
rocksdb_threads: 10,
// todo: enable when we have a way to expose the statistics through node-ctrl
rocksdb_disable_statistics: true,
Expand All @@ -43,7 +54,7 @@ impl Default for Options {
rocksdb_max_total_wal_size: 2 * (1 << 30), // 2 GiB
rocksdb_write_buffer_size: 0,
writer_commit_batch_size_threshold: 200,
writer_commit_time_interval: Duration::from_millis(13),
writer_commit_time_interval: Duration::from_millis(13).into(),
writer_queue_len: 200,
flush_wal_on_commit: true,
}
Expand Down
20 changes: 9 additions & 11 deletions crates/bifrost/src/loglets/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
// by the Apache License, Version 2.0.

use std::collections::{hash_map, HashMap};
use std::path::Path;
use std::sync::{Arc, OnceLock};

use anyhow::Context;
use async_trait::async_trait;
use tokio::sync::Mutex as AsyncMutex;
use tracing::debug;
Expand All @@ -21,6 +21,7 @@ use super::log_store_writer::RocksDbLogWriterHandle;
use super::{LocalLoglet, Options};
use crate::loglet::{Loglet, LogletOffset, LogletProvider};
use crate::loglets::local_loglet::log_store_writer::WriterOptions;
use crate::ProviderError;
use crate::{Error, LogletParams};

#[derive(Debug)]
Expand All @@ -32,18 +33,15 @@ pub struct LocalLogletProvider {
}

impl LocalLogletProvider {
pub fn new(storage_path: &Path, raw_options: &serde_json::Value) -> Arc<Self> {
let opts =
serde_json::from_value(raw_options.clone()).expect("to be able to deserialize options");
// todo: implement loglet loading error handling
let log_store = RocksDbLogStore::new(storage_path, &opts).expect("bifrost local loglet");
pub fn new(opts: Options) -> Result<Arc<Self>, ProviderError> {
let log_store = RocksDbLogStore::new(&opts).context("RockDb LogStore")?;

Arc::new(Self {
Ok(Arc::new(Self {
log_store,
active_loglets: Default::default(),
log_writer: OnceLock::new(),
opts,
})
}))
}
}

Expand Down Expand Up @@ -86,10 +84,10 @@ impl LogletProvider for LocalLogletProvider {
Ok(loglet as Arc<dyn Loglet>)
}

fn start(&self) -> Result<(), Error> {
fn start(&self) -> Result<(), ProviderError> {
let writer_options = WriterOptions {
channel_size: self.opts.writer_queue_len,
commit_time_interval: self.opts.writer_commit_time_interval,
commit_time_interval: self.opts.writer_commit_time_interval.into(),
batch_size_threshold: self.opts.writer_queue_len,
flush_wal_on_commit: self.opts.flush_wal_on_commit,
disable_wal: self.opts.rocksdb_disable_wal,
Expand All @@ -103,7 +101,7 @@ impl LogletProvider for LocalLogletProvider {
Ok(())
}

async fn shutdown(&self) -> Result<(), Error> {
async fn shutdown(&self) -> Result<(), ProviderError> {
debug!("Shutting down local loglet provider");
self.log_store.shutdown();
Ok(())
Expand Down
14 changes: 5 additions & 9 deletions crates/bifrost/src/loglets/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ use tracing::{debug, info};

use crate::loglet::{Loglet, LogletBase, LogletOffset, LogletProvider};
use crate::metadata::LogletParams;
use crate::Error;
use crate::LogRecord;

pub fn default_config() -> serde_json::Value {
serde_json::Value::Null
}
use crate::{Error, ProviderError};

#[derive(Default)]
pub struct MemoryLogletProvider {
Expand All @@ -37,8 +33,8 @@ pub struct MemoryLogletProvider {

#[allow(dead_code)]
impl MemoryLogletProvider {
pub fn new() -> Arc<Self> {
Arc::default()
pub fn new() -> Result<Arc<Self>, ProviderError> {
Ok(Arc::default())
}

pub fn with_init_delay(init_delay: Duration) -> Arc<Self> {
Expand Down Expand Up @@ -75,12 +71,12 @@ impl LogletProvider for MemoryLogletProvider {
Ok(loglet as Arc<dyn Loglet>)
}

fn start(&self) -> Result<(), Error> {
fn start(&self) -> Result<(), ProviderError> {
info!("Starting in-memory loglet provider");
Ok(())
}

async fn shutdown(&self) -> Result<(), Error> {
async fn shutdown(&self) -> Result<(), ProviderError> {
info!("Shutting down in-memory loglet provider");
Ok(())
}
Expand Down
Loading

0 comments on commit 3d7c718

Please sign in to comment.