diff --git a/Cargo.toml b/Cargo.toml index 0adf3c1be..4817eaaf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,10 @@ codegen-units = 1 # Let's be defensive and abort on every panic panic = "abort" +[profile.release-debug] +inherits = "release" +debug = true + [profile.dev] # Let's be defensive and abort on every panic panic = "abort" diff --git a/crates/bifrost/benches/append_throughput.rs b/crates/bifrost/benches/append_throughput.rs index d02a5c007..38a566d53 100644 --- a/crates/bifrost/benches/append_throughput.rs +++ b/crates/bifrost/benches/append_throughput.rs @@ -15,7 +15,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::StreamExt; use restate_bifrost::{Bifrost, BifrostService}; use restate_core::metadata; -use restate_rocksdb::{DbName, Owner, RocksDbManager}; +use restate_rocksdb::{DbName, RocksDbManager}; use restate_types::config::{ BifrostOptionsBuilder, CommonOptionsBuilder, ConfigurationBuilder, LocalLogletOptionsBuilder, }; @@ -174,14 +174,12 @@ fn write_throughput_local_loglet(c: &mut Criterion) { group.finish(); let db_manager = RocksDbManager::get(); - let db = db_manager - .get_db(Owner::Bifrost, DbName::new("local-loglet")) - .unwrap(); + let db = db_manager.get_db(DbName::new("local-loglet")).unwrap(); let stats = db.get_statistics_str(); let total_wb_usage = db_manager.get_total_write_buffer_usage(); let wb_capacity = db_manager.get_total_write_buffer_capacity(); let memory = db_manager - .get_memory_usage_stats(&[(Owner::Bifrost, DbName::new("local-loglet"))]) + .get_memory_usage_stats(&[DbName::new("local-loglet")]) .unwrap(); test_runner_rt.block_on(tc.shutdown_node("completed", 0)); test_runner_rt.block_on(RocksDbManager::get().shutdown()); diff --git a/crates/bifrost/src/loglets/local_loglet/log_store.rs b/crates/bifrost/src/loglets/local_loglet/log_store.rs index ee1fa1244..75819e425 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use restate_rocksdb::{ - CfExactPattern, CfName, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError, + CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError, }; use restate_types::arc_util::Updateable; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; @@ -58,23 +58,18 @@ impl RocksDbLogStore { let data_dir = options.data_dir(); - let db_spec = DbSpecBuilder::new( - DbName::new(DB_NAME), - Owner::Bifrost, - data_dir, - db_options(options), - ) - .add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options) - .add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options) - // not very important but it's to reduce the number of merges by flushing. - // it's also a small cf so it should be quick. - .add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF)) - .ensure_column_families(cfs) - .build_as_db(); + let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, db_options(options)) + .add_cf_pattern(CfExactPattern::new(DATA_CF), cf_data_options) + .add_cf_pattern(CfExactPattern::new(METADATA_CF), cf_metadata_options) + // not very important but it's to reduce the number of merges by flushing. + // it's also a small cf so it should be quick. + .add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF)) + .ensure_column_families(cfs) + .build_as_db(); let db_name = db_spec.name().clone(); // todo: use the returned rocksdb object when open_db returns Arc let _ = db_manager.open_db(updateable_options, db_spec)?; - let rocksdb = db_manager.get_db(Owner::Bifrost, db_name).unwrap(); + let rocksdb = db_manager.get_db(db_name).unwrap(); Ok(Self { rocksdb }) } @@ -138,6 +133,7 @@ fn cf_data_options(mut opts: rocksdb::Options) -> rocksdb::Options { // // Set compactions per level // + opts.set_max_write_buffer_number(2); opts.set_num_levels(7); opts.set_compression_per_level(&[ DBCompressionType::None, @@ -165,11 +161,7 @@ fn cf_metadata_options(mut opts: rocksdb::Options) -> rocksdb::Options { DBCompressionType::None, DBCompressionType::Zstd, ]); - // - // Most of the changes are highly temporal, we try to delay flushing - // to merge metadata updates into fewer L0 files. - opts.set_max_write_buffer_number(3); - opts.set_min_write_buffer_number_to_merge(3); + opts.set_max_write_buffer_number(2); opts.set_max_successive_merges(10); // Merge operator for log state updates opts.set_merge_operator( diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs index 66b053cdd..43d19a083 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs @@ -210,7 +210,7 @@ impl LogStoreWriter { if self.manual_wal_flush { // WAL flush is done in the foreground, but sync will happen in the background to avoid // blocking IO. - if let Err(e) = self.rocksdb.inner().flush_wal(opts.sync_wal_before_ack) { + if let Err(e) = self.rocksdb.flush_wal(opts.sync_wal_before_ack).await { warn!("Failed to flush rocksdb WAL in local loglet : {}", e); self.send_acks(Err(Error::LogStoreError(e.into()))); return; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 072bb6f89..82cbd8e00 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -14,7 +14,8 @@ use codederror::CodedError; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; use restate_rocksdb::{ - CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDbManager, RocksError, + CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, + RocksError, }; use restate_types::arc_util::Updateable; use restate_types::config::RocksDbOptions; @@ -22,7 +23,7 @@ use restate_types::storage::{ StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, }; use restate_types::Version; -use rocksdb::{BoundColumnFamily, Options, WriteOptions, DB}; +use rocksdb::{BoundColumnFamily, Options, WriteBatch, WriteOptions, DB}; use std::path::Path; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; @@ -63,6 +64,8 @@ pub enum MetadataStoreRequest { pub enum Error { #[error("storage error: {0}")] Storage(#[from] rocksdb::Error), + #[error("rocksdb error: {0}")] + RocksDb(#[from] RocksError), #[error("failed precondition: {0}")] FailedPrecondition(String), #[error("invalid argument: {0}")] @@ -99,6 +102,7 @@ pub enum BuildError { /// store in a single thread. pub struct LocalMetadataStore { db: Arc, + rocksdb: Arc, opts: Box + Send + 'static>, request_rx: RequestReceiver, buffer: BytesMut, @@ -119,11 +123,11 @@ impl LocalMetadataStore { { let (request_tx, request_rx) = mpsc::channel(request_queue_length); + let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); let cfs = vec![CfName::new(KV_PAIRS)]; let db_spec = DbSpecBuilder::new( - DbName::new(DB_NAME), - Owner::MetadataStore, + db_name.clone(), data_dir.as_ref().to_path_buf(), Options::default(), ) @@ -132,9 +136,13 @@ impl LocalMetadataStore { .build_as_db(); let db = db_manager.open_db(rocksdb_options(), db_spec)?; + let rocksdb = db_manager + .get_db(db_name) + .expect("metadata store db is open"); Ok(Self { db, + rocksdb, opts: Box::new(rocksdb_options()), buffer: BytesMut::default(), request_rx, @@ -204,7 +212,7 @@ impl LocalMetadataStore { precondition, result_tx, } => { - let result = self.put(&key, &value, precondition); + let result = self.put(&key, &value, precondition).await; Self::log_error(&result, "Put"); let _ = result_tx.send(result); } @@ -244,18 +252,18 @@ impl LocalMetadataStore { } } - fn put( + async fn put( &mut self, key: &ByteString, value: &VersionedValue, precondition: Precondition, ) -> Result<()> { match precondition { - Precondition::None => Ok(self.write_versioned_kv_pair(key, value)?), + Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?), Precondition::DoesNotExist => { let current_version = self.get_version(key)?; if current_version.is_none() { - Ok(self.write_versioned_kv_pair(key, value)?) + Ok(self.write_versioned_kv_pair(key, value).await?) } else { Err(Error::kv_pair_exists()) } @@ -263,7 +271,7 @@ impl LocalMetadataStore { Precondition::MatchesVersion(version) => { let current_version = self.get_version(key)?; if current_version == Some(version) { - Ok(self.write_versioned_kv_pair(key, value)?) + Ok(self.write_versioned_kv_pair(key, value).await?) } else { Err(Error::version_mismatch(version, current_version)) } @@ -271,16 +279,22 @@ impl LocalMetadataStore { } } - fn write_versioned_kv_pair(&mut self, key: &ByteString, value: &VersionedValue) -> Result<()> { + async fn write_versioned_kv_pair( + &mut self, + key: &ByteString, + value: &VersionedValue, + ) -> Result<()> { self.buffer.clear(); Self::encode(value, &mut self.buffer)?; let write_options = self.write_options(); let cf_handle = self.kv_cf_handle(); - self.db - .put_cf_opt(&cf_handle, key, self.buffer.as_ref(), &write_options)?; - - Ok(()) + let mut wb = WriteBatch::default(); + wb.put_cf(&cf_handle, key, self.buffer.as_ref()); + Ok(self + .rocksdb + .write_batch(Priority::High, IoMode::default(), write_options, wb) + .await?) } fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> { diff --git a/crates/node/src/network_server/handler/mod.rs b/crates/node/src/network_server/handler/mod.rs index 6582a8773..ce3086a2d 100644 --- a/crates/node/src/network_server/handler/mod.rs +++ b/crates/node/src/network_server/handler/mod.rs @@ -136,6 +136,7 @@ const ROCKSDB_DB_PROPERTIES: &[(&str, MetricUnit)] = &[ ("rocksdb.block-cache-capacity", MetricUnit::Bytes), ("rocksdb.block-cache-usage", MetricUnit::Bytes), ("rocksdb.block-cache-pinned-usage", MetricUnit::Bytes), + ("rocksdb.num-running-flushes", MetricUnit::Count), ]; // Per column-family properties @@ -161,7 +162,6 @@ const ROCKSDB_CF_PROPERTIES: &[(&str, MetricUnit)] = &[ "rocksdb.estimate-pending-compaction-bytes", MetricUnit::Bytes, ), - ("rocksdb.num-running-flushes", MetricUnit::Count), ("rocksdb.num-running-compactions", MetricUnit::Count), ("rocksdb.actual-delayed-write-rate", MetricUnit::Count), ("rocksdb.num-files-at-level0", MetricUnit::Count), @@ -203,13 +203,10 @@ pub async fn render_metrics(State(state): State) -> String ); for db in &all_dbs { - let labels = vec![ - format!("db=\"{}\"", formatting::sanitize_label_value(&db.name)), - format!( - "owner=\"{}\"", - formatting::sanitize_label_value(db.owner.into()) - ), - ]; + let labels = vec![format!( + "db=\"{}\"", + formatting::sanitize_label_value(&db.name) + )]; // Tickers (Counters) for ticker in ROCKSDB_TICKERS { format_rocksdb_stat_ticker_for_prometheus(&mut out, db, &labels, *ticker); diff --git a/crates/node/src/network_server/metrics.rs b/crates/node/src/network_server/metrics.rs index f774e93a9..41d510bb6 100644 --- a/crates/node/src/network_server/metrics.rs +++ b/crates/node/src/network_server/metrics.rs @@ -18,7 +18,7 @@ use restate_types::config::CommonOptions; /// Be mindful when adding new labels, the number of time series(es) is directly propotional /// to cardinality of the chosen labels. Avoid using labels with potential high cardinality /// as much as possible (e.g. `restate.invocation.id`) -static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service"]; +static ALLOWED_LABELS: &[&str] = &["rpc.method", "rpc.service", "command", "service", "db"]; pub(crate) fn install_global_prometheus_recorder(opts: &CommonOptions) -> PrometheusHandle { let builder = PrometheusBuilder::default() diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index 3d9d523cf..03c7f3e1a 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -469,7 +469,6 @@ impl<'a> Transaction for RocksDBTransaction<'a> { // writes to RocksDB. However, it is safe to write the WriteBatch for a given partition, // because there can only be a single writer (the leading PartitionProcessor). let write_batch = self.txn.get_writebatch(); - // todo: make async and use configuration to control use of WAL if write_batch.is_empty() { return Ok(()); } diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index e09d117ce..bae3db961 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -17,7 +17,7 @@ use tracing::debug; use restate_core::ShutdownError; use restate_rocksdb::{ - CfName, CfPrefixPattern, DbName, DbSpecBuilder, Owner, RocksDb, RocksDbManager, RocksError, + CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError, }; use restate_types::arc_util::Updateable; use restate_types::config::RocksDbOptions; @@ -59,15 +59,10 @@ impl PartitionStoreManager { ) -> std::result::Result { let options = storage_opts.load(); - let db_spec = DbSpecBuilder::new( - DbName::new(DB_NAME), - Owner::PartitionProcessor, - options.data_dir(), - db_options(), - ) - .add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options) - .ensure_column_families(partition_ids_to_cfs(initial_partition_set)) - .build_as_optimistic_db(); + let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), options.data_dir(), db_options()) + .add_cf_pattern(CfPrefixPattern::new(PARTITION_CF_PREFIX), cf_options) + .ensure_column_families(partition_ids_to_cfs(initial_partition_set)) + .build_as_optimistic_db(); let manager = RocksDbManager::get(); // todo remove this when open_db is async @@ -75,9 +70,7 @@ impl PartitionStoreManager { .await .map_err(|_| ShutdownError)??; - let rocksdb = manager - .get_db(Owner::PartitionProcessor, DbName::new(DB_NAME)) - .unwrap(); + let rocksdb = manager.get_db(DbName::new(DB_NAME)).unwrap(); Ok(Self { raw_db, diff --git a/crates/rocksdb/src/background.rs b/crates/rocksdb/src/background.rs index 745dc18b0..9a0366d61 100644 --- a/crates/rocksdb/src/background.rs +++ b/crates/rocksdb/src/background.rs @@ -11,12 +11,13 @@ use std::time::Instant; use derive_builder::Builder; -use metrics::histogram; +use metrics::{gauge, histogram}; +use tokio::sync::oneshot; use crate::metric_definitions::{ STORAGE_BG_TASK_RUN_DURATION, STORAGE_BG_TASK_TOTAL_DURATION, STORAGE_BG_TASK_WAIT_DURATION, }; -use crate::{DbName, Owner, Priority}; +use crate::{Priority, OP_TYPE, PRIORITY, STORAGE_BG_TASK_IN_FLIGHT}; #[derive(Debug, Clone, Copy, PartialEq, Eq, strum_macros::IntoStaticStr)] #[strum(serialize_all = "kebab-case")] @@ -24,26 +25,29 @@ pub enum StorageTaskKind { WriteBatch, OpenColumnFamily, FlushWal, + FlushMemtables, Shutdown, OpenDb, } +impl StorageTaskKind { + pub fn as_static_str(&self) -> &'static str { + self.into() + } +} + #[derive(Builder)] #[builder(pattern = "owned")] #[builder(name = "StorageTask")] pub struct ReadyStorageTask { op: OP, - #[builder(setter(into))] - db_name: DbName, - #[builder(default)] - owner: Owner, #[builder(default)] pub(crate) priority: Priority, /// required kind: StorageTaskKind, #[builder(setter(skip))] #[builder(default = "Instant::now()")] - enqueue_at: Instant, + created_at: Instant, } impl ReadyStorageTask @@ -51,34 +55,58 @@ where OP: FnOnce() -> R + Send + 'static, R: Send + 'static, { - pub fn run(self) -> R { + pub fn into_runner(self) -> impl FnOnce() -> R + Send + 'static { + gauge!(STORAGE_BG_TASK_IN_FLIGHT, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), + ) + .increment(1); + + let span = tracing::Span::current().clone(); + + move || span.in_scope(|| self.run()) + } + + pub fn into_async_runner(self, tx: oneshot::Sender) -> impl FnOnce() + Send + 'static { + gauge!(STORAGE_BG_TASK_IN_FLIGHT, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), + ) + .increment(1); + let span = tracing::Span::current().clone(); + + move || { + span.in_scope(|| { + let result = self.run(); + let _ = tx.send(result); + }) + } + } + + fn run(self) -> R { let start = Instant::now(); - let kind: &'static str = self.kind.into(); - let owner: &'static str = self.owner.into(); - let priority: &'static str = self.priority.into(); histogram!( STORAGE_BG_TASK_WAIT_DURATION, - "kind" => kind, - "db" => self.db_name.to_string(), - "owner" => owner, - "priority" => priority, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), ) - .record(self.enqueue_at.elapsed()); + .record(self.created_at.elapsed()); let res = (self.op)(); histogram!(STORAGE_BG_TASK_RUN_DURATION, - "kind" => kind, - "db" => self.db_name.to_string(), - "owner" => owner, - "priority" => priority, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), ) .record(start.elapsed()); histogram!(STORAGE_BG_TASK_TOTAL_DURATION, - "kind" => kind, - "db" => self.db_name.to_string(), - "owner" => owner, - "priority" => priority, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), + ) + .record(self.created_at.elapsed()); + gauge!(STORAGE_BG_TASK_IN_FLIGHT, + PRIORITY => self.priority.as_static_str(), + OP_TYPE => self.kind.as_static_str(), ) - .record(self.enqueue_at.elapsed()); + .decrement(1); res } } diff --git a/crates/rocksdb/src/db_manager.rs b/crates/rocksdb/src/db_manager.rs index 4eeb8df07..a0664c8e4 100644 --- a/crates/rocksdb/src/db_manager.rs +++ b/crates/rocksdb/src/db_manager.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, OnceLock}; use std::time::Instant; @@ -25,9 +25,7 @@ use restate_types::arc_util::Updateable; use restate_types::config::{CommonOptions, Configuration, RocksDbOptions, StatisticsLevel}; use crate::background::ReadyStorageTask; -use crate::{ - metric_definitions, DbName, DbSpec, Owner, Priority, RocksAccess, RocksDb, RocksError, -}; +use crate::{metric_definitions, DbName, DbSpec, Priority, RocksAccess, RocksDb, RocksError}; static DB_MANAGER: OnceLock = OnceLock::new(); @@ -47,7 +45,8 @@ pub struct RocksDbManager { cache: Cache, // auto updates to changes in common.rocksdb_memory_limit and common.rocksdb_memtable_total_size_limit write_buffer_manager: WriteBufferManager, - dbs: RwLock>>, + stall_detection_millis: AtomicUsize, + dbs: RwLock>>, watchdog_tx: mpsc::UnboundedSender, shutting_down: AtomicBool, high_pri_pool: rayon::ThreadPool, @@ -79,11 +78,16 @@ impl RocksDbManager { let opts = base_opts.load(); let cache = Cache::new_lru_cache(opts.rocksdb_total_memory_size.get()); let write_buffer_manager = WriteBufferManager::new_write_buffer_manager_with_cache( - opts.rocksdb_total_memtables_size, + opts.rocksdb_total_memtables_size(), true, cache.clone(), ); - + // There is no atomic u128 (and it's a ridiculous amount of time anyway), we trim the value + // to usize and hope for the best. + let stall_detection_millis = AtomicUsize::new( + usize::try_from(opts.rocksdb_write_stall_threshold.as_millis()) + .expect("threshold fits usize"), + ); // Setup the shared rocksdb environment let mut env = rocksdb::Env::new().expect("rocksdb env is created"); env.set_low_priority_background_threads(opts.rocksdb_bg_threads().get() as i32); @@ -116,6 +120,7 @@ impl RocksDbManager { shutting_down: AtomicBool::new(false), high_pri_pool, low_pri_pool, + stall_detection_millis, }; DB_MANAGER.set(manager).expect("DBManager initialized once"); @@ -132,8 +137,8 @@ impl RocksDbManager { Self::get() } - pub fn get_db(&self, owner: Owner, name: DbName) -> Option> { - self.dbs.read().get(&(owner, name)).cloned() + pub fn get_db(&self, name: DbName) -> Option> { + self.dbs.read().get(&name).cloned() } // todo: move this to async after allowing bifrost to async-create providers. @@ -152,7 +157,6 @@ impl RocksDbManager { // get latest options let options = updateable_opts.load().clone(); let name = db_spec.name.clone(); - let owner = db_spec.owner; // use the spec default options as base then apply the config from the updateable. self.amend_db_options(&mut db_spec.db_options, &options); @@ -164,12 +168,11 @@ impl RocksDbManager { let path = db_spec.path.clone(); let wrapper = Arc::new(RocksDb::new(self, db_spec, db.clone())); - self.dbs.write().insert((owner, name.clone()), wrapper); + self.dbs.write().insert(name.clone(), wrapper); if let Err(e) = self .watchdog_tx .send(WatchdogCommand::Register(ConfigSubscription { - owner, name: name.clone(), updateable_rocksdb_opts: Box::new(updateable_opts), last_applied_opts: options, @@ -177,7 +180,6 @@ impl RocksDbManager { { warn!( db = %name, - owner = %owner, path = %path.display(), "Failed to register database with watchdog: {}, this database will \ not receive config updates but the system will continue to run as normal", @@ -186,7 +188,6 @@ impl RocksDbManager { } debug!( db = %name, - owner = %owner, path = %path.display(), "Opened rocksdb database" ); @@ -215,7 +216,7 @@ impl RocksDbManager { /// Returns aggregated memory usage for all databases if filter is empty pub fn get_memory_usage_stats( &self, - filter: &[(Owner, DbName)], + filter: &[DbName], ) -> Result { let mut builder = rocksdb::perf::MemoryUsageBuilder::new()?; builder.add_cache(&self.cache); @@ -243,19 +244,18 @@ impl RocksDbManager { // Ask all databases to shutdown cleanly. let start = Instant::now(); let mut tasks = tokio::task::JoinSet::new(); - for ((owner, name), db) in self.dbs.write().drain() { + for (name, db) in self.dbs.write().drain() { tasks.spawn(async move { db.shutdown().await; - (name.clone(), owner) + name.clone() }); } // wait for all tasks to complete while let Some(res) = tasks.join_next().await { match res { - Ok((name, owner)) => { + Ok(name) => { info!( db = %name, - owner = %owner, "Rocksdb database shutdown completed, {} remaining", tasks.len()); } Err(e) => { @@ -308,6 +308,13 @@ impl RocksDbManager { db_options.set_use_direct_io_for_flush_and_compaction(true); } + pub(crate) fn stall_detection_duration(&self) -> std::time::Duration { + std::time::Duration::from_millis( + self.stall_detection_millis + .load(std::sync::atomic::Ordering::Relaxed) as u64, + ) + } + pub(crate) fn default_cf_options(&self, opts: &RocksDbOptions) -> rocksdb::Options { let mut cf_options = rocksdb::Options::default(); // write buffer @@ -358,14 +365,9 @@ impl RocksDbManager { { let (tx, rx) = tokio::sync::oneshot::channel(); let priority = task.priority; - let action = move || { - let result = task.run(); - // ignoring the error since receiver might not be interested in the response - let _ = tx.send(result); - }; match priority { - Priority::High => self.high_pri_pool.spawn(action), - Priority::Low => self.low_pri_pool.spawn(action), + Priority::High => self.high_pri_pool.spawn(task.into_async_runner(tx)), + Priority::Low => self.low_pri_pool.spawn(task.into_async_runner(tx)), } rx.await.map_err(|_| ShutdownError) } @@ -390,15 +392,14 @@ impl RocksDbManager { OP: FnOnce() + Send + 'static, { match task.priority { - Priority::High => self.high_pri_pool.spawn(|| task.run()), - Priority::Low => self.low_pri_pool.spawn(|| task.run()), + Priority::High => self.high_pri_pool.spawn(task.into_runner()), + Priority::Low => self.low_pri_pool.spawn(task.into_runner()), } } } #[allow(dead_code)] struct ConfigSubscription { - owner: Owner, name: DbName, updateable_rocksdb_opts: Box + Send + 'static>, last_applied_opts: RocksDbOptions, @@ -487,8 +488,28 @@ impl DbWatchdog { info!("Ignoring config update as we are shutting down"); return; } - // Memory budget changed? let new_common_opts = self.updateable_common_opts.load(); + + // Stall detection threshold changed? + let current_stall_detection_millis = + self.manager + .stall_detection_millis + .load(std::sync::atomic::Ordering::Relaxed) as u64; + let new_stall_detection_millis = + new_common_opts.rocksdb_write_stall_threshold.as_millis() as u64; + if current_stall_detection_millis != new_stall_detection_millis { + info!( + old = current_stall_detection_millis, + new = new_stall_detection_millis, + "[config update] Stall detection threshold is updated", + ); + self.manager.stall_detection_millis.store( + new_stall_detection_millis as usize, + std::sync::atomic::Ordering::Relaxed, + ); + } + + // Memory budget changed? if new_common_opts.rocksdb_total_memory_size != self.current_common_opts.rocksdb_total_memory_size { @@ -500,21 +521,24 @@ impl DbWatchdog { ); self.cache .set_capacity(new_common_opts.rocksdb_total_memory_size.get()); + self.manager + .write_buffer_manager + .set_buffer_size(new_common_opts.rocksdb_total_memtables_size()); } // update memtable total memory - if new_common_opts.rocksdb_total_memtables_size - != self.current_common_opts.rocksdb_total_memtables_size + if new_common_opts.rocksdb_total_memtables_size() + != self.current_common_opts.rocksdb_total_memtables_size() { info!( - old = self.current_common_opts.rocksdb_total_memtables_size, - new = new_common_opts.rocksdb_total_memtables_size, + old = self.current_common_opts.rocksdb_total_memtables_size(), + new = new_common_opts.rocksdb_total_memtables_size(), "[config update] Setting rocksdb total memtables size limit to {}", - ByteCount::from(new_common_opts.rocksdb_total_memtables_size) + ByteCount::from(new_common_opts.rocksdb_total_memtables_size()) ); self.manager .write_buffer_manager - .set_buffer_size(new_common_opts.rocksdb_total_memtables_size); + .set_buffer_size(new_common_opts.rocksdb_total_memtables_size()); } // todo: Apply other changes to the databases. diff --git a/crates/rocksdb/src/db_spec.rs b/crates/rocksdb/src/db_spec.rs index 62da04cb1..0d6a9532d 100644 --- a/crates/rocksdb/src/db_spec.rs +++ b/crates/rocksdb/src/db_spec.rs @@ -17,28 +17,6 @@ use crate::{BoxedCfMatcher, BoxedCfOptionUpdater}; type SmartString = smartstring::SmartString; -#[derive( - Debug, - derive_more::Display, - strum_macros::VariantArray, - strum_macros::IntoStaticStr, - Clone, - Copy, - Eq, - PartialEq, - Hash, - Default, -)] -#[strum(serialize_all = "kebab-case")] -pub enum Owner { - #[default] - // A system-wide database, or represents an operation that has no specific owner. - None, - PartitionProcessor, - Bifrost, - MetadataStore, -} - #[derive( Debug, derive_more::Deref, @@ -141,7 +119,6 @@ impl CfNameMatch for CfExactPattern { #[builder(pattern = "owned", build_fn(name = "build"))] pub struct DbSpec { pub(crate) name: DbName, - pub(crate) owner: Owner, pub(crate) path: PathBuf, /// All column families that should be flushed on shutdown, no flush will be performed if empty /// which should be the default for most cases. @@ -171,15 +148,9 @@ pub struct DbSpec { } impl DbSpecBuilder { - pub fn new( - name: DbName, - owner: Owner, - path: PathBuf, - db_options: rocksdb::Options, - ) -> DbSpecBuilder { + pub fn new(name: DbName, path: PathBuf, db_options: rocksdb::Options) -> DbSpecBuilder { Self { name: Some(name), - owner: Some(owner), path: Some(path), db_options: Some(db_options), ..Self::default() diff --git a/crates/rocksdb/src/lib.rs b/crates/rocksdb/src/lib.rs index 3fde7e72d..d2d7eeb79 100644 --- a/crates/rocksdb/src/lib.rs +++ b/crates/rocksdb/src/lib.rs @@ -13,9 +13,13 @@ mod db_manager; mod db_spec; mod error; mod metric_definitions; +mod perf; mod rock_access; use metrics::counter; +use metrics::gauge; +use metrics::histogram; +use restate_core::ShutdownError; use restate_types::config::RocksDbOptions; use tracing::debug; use tracing::error; @@ -25,11 +29,13 @@ use tracing::warn; use std::fmt; use std::path::PathBuf; use std::sync::Arc; +use std::time::Instant; use rocksdb::statistics::Histogram; use rocksdb::statistics::HistogramData; use rocksdb::statistics::Ticker; +use self::background::ReadyStorageTask; // re-exports pub use self::db_manager::RocksDbManager; pub use self::db_spec::*; @@ -45,12 +51,19 @@ type BoxedCfOptionUpdater = Box rocksdb::Options + S /// Denotes whether an operation is considered latency sensitive or not #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, strum_macros::IntoStaticStr)] +#[strum(serialize_all = "kebab-case")] pub enum Priority { High, #[default] Low, } +impl Priority { + pub fn as_static_str(&self) -> &'static str { + self.into() + } +} + /// Defines how to perform a potentially blocking rocksdb IO operation. #[derive(Clone, Copy, Default, Debug, Eq, PartialEq)] pub enum IoMode { @@ -65,10 +78,9 @@ pub enum IoMode { } #[derive(derive_more::Display, Clone)] -#[display(fmt = "{}::{}", owner, name)] +#[display(fmt = "{}", name)] pub struct RocksDb { manager: &'static RocksDbManager, - pub owner: Owner, pub name: DbName, pub path: PathBuf, pub db_options: rocksdb::Options, @@ -81,13 +93,7 @@ static_assertions::assert_impl_all!(RocksDb: Send, Sync); impl fmt::Debug for RocksDb { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "RocksDb({}::{} at {})", - self.owner, - self.name, - self.path.display() - ) + write!(f, "RocksDb({} at {})", self.name, self.path.display()) } } @@ -98,7 +104,6 @@ impl RocksDb { { Self { manager, - owner: spec.owner, name: spec.name, path: spec.path, cf_patterns: spec.cf_patterns.into(), @@ -110,6 +115,8 @@ impl RocksDb { /// Returns the raw rocksdb handle, this should only be used for server operations that /// require direct access to rocksdb. + /// + /// todo: remove this once all access is migrated to this abstraction pub fn inner(&self) -> &Arc { &self.db } @@ -118,6 +125,7 @@ impl RocksDb { self.db.cfs() } + #[tracing::instrument(skip_all, fields(db = %self.name))] pub async fn write_batch( &self, priority: Priority, @@ -128,18 +136,25 @@ impl RocksDb { // depending on the IoMode, we decide how to do the write. match io_mode { IoMode::AllowBlockingIO => { + debug!("Blocking IO is allowed for write_batch, stall detection will not be used in this operation!"); write_options.set_no_slowdown(false); self.db.write_batch(&write_batch, &write_options)?; - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_MAYBE_BLOCKING, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_MAYBE_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); return Ok(()); } IoMode::OnlyIfNonBlocking => { write_options.set_no_slowdown(true); self.db.write_batch(&write_batch, &write_options)?; - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_NON_BLOCKING, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_NON_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); return Ok(()); } @@ -152,45 +167,48 @@ impl RocksDb { let result = self.db.write_batch(&write_batch, &write_options); match result { Ok(_) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_NON_BLOCKING, OP_TYPE => OP_WRITE).increment(1); + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_NON_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) + .increment(1); Ok(()) } Err(e) if is_retryable_error(e.kind()) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_MOVED_TO_BG, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_MOVED_TO_BG, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); - let start = std::time::Instant::now(); // Operation will block, dispatch to background. let db = self.db.clone(); // In the background thread pool we can block on IO write_options.set_no_slowdown(false); let task = StorageTask::default() - .db_name(self.name.clone()) - .owner(self.owner) .priority(priority) .kind(StorageTaskKind::WriteBatch) - .op(move || { - db.write_batch(&write_batch, &write_options)?; - info!( - "background write completed, completed in {:?}", - start.elapsed() - ); - Ok(()) - }) + .op(move || db.write_batch(&write_batch, &write_options)) .build() .unwrap(); - self.manager.async_spawn(task).await? + Ok(race_against_stall_detector(self.manager, task).await??) } Err(e) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_FAILED, OP_TYPE => OP_WRITE) - .increment(1); + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_FAILED, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) + .increment(1); Err(e.into()) } } } // unfortunate side effect of trait objects not supporting generics + #[tracing::instrument(skip_all, fields(db = %self.name))] pub async fn write_tx_batch( &self, priority: Priority, @@ -201,18 +219,25 @@ impl RocksDb { // depending on the IoMode, we decide how to do the write. match io_mode { IoMode::AllowBlockingIO => { + debug!("Blocking IO is allowed for write_batch, stall detection will not be used in this operation!"); write_options.set_no_slowdown(false); self.db.write_tx_batch(&write_batch, &write_options)?; - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_MAYBE_BLOCKING, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_MAYBE_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); return Ok(()); } IoMode::OnlyIfNonBlocking => { write_options.set_no_slowdown(true); self.db.write_tx_batch(&write_batch, &write_options)?; - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_NON_BLOCKING, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_NON_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); return Ok(()); } @@ -225,53 +250,66 @@ impl RocksDb { let result = self.db.write_tx_batch(&write_batch, &write_options); match result { Ok(_) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_NON_BLOCKING, OP_TYPE => OP_WRITE).increment(1); + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_NON_BLOCKING, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) + .increment(1); Ok(()) } Err(e) if is_retryable_error(e.kind()) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_MOVED_TO_BG, OP_TYPE => - OP_WRITE) + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_MOVED_TO_BG, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) .increment(1); - let start = std::time::Instant::now(); // Operation will block, dispatch to background. let db = self.db.clone(); // In the background thread pool we can block on IO write_options.set_no_slowdown(false); let task = StorageTask::default() - .db_name(self.name.clone()) - .owner(self.owner) .priority(priority) .kind(StorageTaskKind::WriteBatch) - .op(move || { - db.write_tx_batch(&write_batch, &write_options)?; - info!( - "background write completed, completed in {:?}", - start.elapsed() - ); - Ok(()) - }) + .op(move || db.write_tx_batch(&write_batch, &write_options)) .build() .unwrap(); - self.manager.async_spawn(task).await? + Ok(race_against_stall_detector(self.manager, task).await??) } Err(e) => { - counter!(STORAGE_IO_OP, DISPOSITION => DISPOSITION_FAILED, OP_TYPE => OP_WRITE) - .increment(1); + counter!(STORAGE_IO_OP, + DISPOSITION => DISPOSITION_FAILED, + OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(), + PRIORITY => priority.as_static_str(), + ) + .increment(1); Err(e.into()) } } } + #[tracing::instrument(skip_all, fields(db = %self.name))] + pub async fn flush_wal(&self, sync: bool) -> Result<(), RocksError> { + let db = self.db.clone(); + let task = StorageTask::default() + .kind(StorageTaskKind::FlushWal) + .op(move || db.flush_wal(sync)) + .build() + .unwrap(); + + self.manager.async_spawn(task).await? + } + + #[tracing::instrument(skip_all, fields(db = %self.name))] pub fn run_bg_wal_sync(&self) { let db = self.db.clone(); let task = StorageTask::default() - .db_name(self.name.clone()) - .owner(self.owner) .kind(StorageTaskKind::FlushWal) .op(move || { if let Err(e) = db.flush_wal(true) { - error!("Failed to flush rocksdb WAL in local loglet : {}", e); + error!("Failed to flush rocksdb WAL: {}", e); } }) .build() @@ -292,13 +330,12 @@ impl RocksDb { self.db_options.get_statistics() } + #[tracing::instrument(skip_all, fields(db = %self.name))] pub async fn open_cf(&self, name: CfName, opts: &RocksDbOptions) -> Result<(), RocksError> { let default_cf_options = self.manager.default_cf_options(opts); let db = self.db.clone(); let cf_patterns = self.cf_patterns.clone(); let task = StorageTask::default() - .db_name(self.name.clone()) - .owner(self.owner) .kind(StorageTaskKind::OpenColumnFamily) .op(move || db.open_cf(name, default_cf_options, cf_patterns)) .build() @@ -307,15 +344,13 @@ impl RocksDb { self.manager.async_spawn(task).await? } + #[tracing::instrument(skip_all, fields(db = %self.name))] pub async fn shutdown(self: Arc) { - let db_name = self.name.clone(); - let owner = self.owner; let manager = self.manager; let op = move || { if let Err(e) = self.db.flush_wal(true) { warn!( db = %self.name, - owner = %self.owner, "Failed to flush local loglet rocksdb WAL: {}", e ); @@ -333,7 +368,6 @@ impl RocksDb { if cfs_to_flush.is_empty() { debug!( db = %self.name, - owner = %self.owner, "No column families to flush for db on shutdown" ); return; @@ -341,12 +375,10 @@ impl RocksDb { debug!( db = %self.name, - owner = %self.owner, "Numbre of column families to flush on shutdown: {}", cfs_to_flush.len()); if let Err(e) = self.db.flush_memtables(cfs_to_flush.as_slice(), true) { warn!( db = %self.name, - owner = %self.owner, "Failed to flush memtables: {}", e ); @@ -355,8 +387,6 @@ impl RocksDb { }; // intentionally ignore scheduling error let task = StorageTask::default() - .db_name(db_name) - .owner(owner) .kind(StorageTaskKind::Shutdown) .op(op) .build() @@ -371,3 +401,37 @@ fn is_retryable_error(error_kind: rocksdb::ErrorKind) -> bool { rocksdb::ErrorKind::Incomplete | rocksdb::ErrorKind::TryAgain | rocksdb::ErrorKind::Busy ) } + +async fn race_against_stall_detector( + manager: &RocksDbManager, + task: ReadyStorageTask, +) -> Result +where + OP: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let mut task = std::pin::pin!(manager.async_spawn(task)); + let mut stalled = false; + let mut stalled_since = Instant::now(); + loop { + tokio::select! { + result = &mut task => { + if stalled { + // reset the flare guage + gauge!(ROCKSDB_STALL_FLARE).decrement(1); + let elapsed = stalled_since.elapsed(); + histogram!(ROCKSDB_STALL_DURATION).record(elapsed); + info!("[Stall Detector] Rocksdb write operation completed after a stall time of {:?}!", elapsed); + } + return result; + } + _ = tokio::time::sleep(manager.stall_detection_duration()), if !stalled => { + stalled = true; + stalled_since = Instant::now(); + gauge!(ROCKSDB_STALL_FLARE).increment(1); + warn!("[Stall Detector] Rocksdb write operation exceeded rocksdb-write-stall-threshold, will continue waiting"); + } + + } + } +} diff --git a/crates/rocksdb/src/metric_definitions.rs b/crates/rocksdb/src/metric_definitions.rs index 271ca46b7..574531ae5 100644 --- a/crates/rocksdb/src/metric_definitions.rs +++ b/crates/rocksdb/src/metric_definitions.rs @@ -8,9 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use metrics::{describe_counter, describe_histogram, Unit}; +use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; -pub const STORAGE_BG_TASK_SPAWNED: &str = "restate.rocksdb_manager.bg_task_spawned.total"; +pub const STORAGE_BG_TASK_IN_FLIGHT: &str = "restate.rocksdb_manager.bg_task_in_flight.total"; pub const STORAGE_IO_OP: &str = "restate.rocksdb_manager.io_operation.total"; pub const STORAGE_BG_TASK_WAIT_DURATION: &str = "restate.rocksdb_manager.bg_task_wait_duration.seconds"; @@ -21,8 +21,20 @@ pub const STORAGE_BG_TASK_RUN_DURATION: &str = pub const STORAGE_BG_TASK_TOTAL_DURATION: &str = "restate.rocksdb_manager.bg_task_total_duration.seconds"; +pub const BLOCK_READ_COUNT: &str = "restate.rocksdb.perf.num_block_read.total"; +pub const BLOCK_READ_BYTES: &str = "restate.rocksdb.perf.block_read_bytes.total"; +pub const WRITE_WAL_DURATION: &str = "restate.rocksdb.perf.write_wal_duration.seconds"; +pub const WRITE_MEMTABLE_DURATION: &str = "restate.rocksdb.perf.write_memtable_duration.seconds"; +pub const WRITE_PRE_AND_POST_DURATION: &str = + "restate.rocksdb.perf.write_pre_and_post_duration.seconds"; +pub const WRITE_ARTIFICIAL_DELAY_DURATION: &str = + "restate.rocksdb.perf.write_artificial_delay_duration.seconds"; + +pub const ROCKSDB_STALL_FLARE: &str = "restate.rocksdb_stall_flare"; +pub const ROCKSDB_STALL_DURATION: &str = "restate.rocksdb_stall_duration.seconds"; + pub const OP_TYPE: &str = "operation"; -pub const OP_WRITE: &str = "write-batch"; +pub const PRIORITY: &str = "priority"; pub const DISPOSITION: &str = "disposition"; @@ -32,17 +44,40 @@ pub const DISPOSITION_MOVED_TO_BG: &str = "moved-to-bg"; pub const DISPOSITION_FAILED: &str = "failed"; pub fn describe_metrics() { - describe_counter!( - STORAGE_BG_TASK_SPAWNED, + describe_gauge!( + ROCKSDB_STALL_FLARE, Unit::Count, - "Number of background storage tasks spawned" + "Number of in-flight operations that are considered stalled by the stall detector" + ); + + describe_gauge!( + STORAGE_BG_TASK_IN_FLIGHT, + Unit::Count, + "Number of background storage tasks in-flight" ); describe_counter!( STORAGE_IO_OP, Unit::Count, - "Number of forground rocksdb operations, label 'disposition' defines how IO was actually handled. -Options are 'maybe-blocking', 'non-blocking', 'moved-to-bg'" + "Number of forground rocksdb operations, label 'disposition' defines how IO was actually handled. Options are 'maybe-blocking', 'non-blocking', 'moved-to-bg'" + ); + + describe_counter!( + BLOCK_READ_COUNT, + Unit::Count, + "Number of rocksdb blocks read from disk" + ); + + describe_counter!( + BLOCK_READ_BYTES, + Unit::Bytes, + "Total number of bytes read from disk during this operation" + ); + + describe_histogram!( + ROCKSDB_STALL_DURATION, + Unit::Seconds, + "Time spent after a write is considered as stalled by the stall detector, note that this is only updated when the write is unstalled" ); describe_histogram!( @@ -62,4 +97,28 @@ Options are 'maybe-blocking', 'non-blocking', 'moved-to-bg'" Unit::Seconds, "Total time to queue+run a storage task, with 'priority' label" ); + + describe_histogram!( + WRITE_WAL_DURATION, + Unit::Seconds, + "Time spent writing to WAL" + ); + + describe_histogram!( + WRITE_MEMTABLE_DURATION, + Unit::Seconds, + "Time spent writing to memtable" + ); + + describe_histogram!( + WRITE_PRE_AND_POST_DURATION, + Unit::Seconds, + "Time spent in pre/post write operations by rocksdb" + ); + + describe_histogram!( + WRITE_ARTIFICIAL_DELAY_DURATION, + Unit::Seconds, + "Extra write delay introduced by rocksdb to meet target write rates" + ); } diff --git a/crates/rocksdb/src/perf.rs b/crates/rocksdb/src/perf.rs new file mode 100644 index 000000000..f7df7e7af --- /dev/null +++ b/crates/rocksdb/src/perf.rs @@ -0,0 +1,113 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::cell::RefCell; + +use metrics::{counter, histogram}; +use rocksdb::{PerfContext, PerfMetric, PerfStatsLevel}; + +use crate::background::StorageTaskKind; +use crate::{ + BLOCK_READ_BYTES, BLOCK_READ_COUNT, OP_TYPE, WRITE_ARTIFICIAL_DELAY_DURATION, + WRITE_MEMTABLE_DURATION, WRITE_PRE_AND_POST_DURATION, WRITE_WAL_DURATION, +}; + +thread_local! { + static ROCKSDB_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default()); +} + +/// This guard must be created and dropped in the same thread, you should never use the same +/// guard across .await points. This should strictly be used within the bounds of the sync +/// RocksAccess layer. +pub struct RocksDbPerfGuard { + kind: StorageTaskKind, +} + +impl RocksDbPerfGuard { + /// IMPORTANT NOTE: you MUST bind this value with a named variable (f) to ensure + /// that the guard is not insta-dropped. Binding to something like `_x = ...` works, just don't + /// use the special `_` variable and you'll be fine. Unfortunately, rust/clippy doesn't have a + /// mechanism to enforce this at the moment. + #[must_use] + pub fn new(kind: StorageTaskKind) -> Self { + rocksdb::perf::set_perf_stats(PerfStatsLevel::EnableTimeExceptForMutex); + ROCKSDB_PERF_CONTEXT.with(|context| { + context.borrow_mut().reset(); + }); + RocksDbPerfGuard { kind } + } +} + +impl Drop for RocksDbPerfGuard { + fn drop(&mut self) { + rocksdb::perf::set_perf_stats(PerfStatsLevel::Disable); + // report collected metrics + ROCKSDB_PERF_CONTEXT.with(|context| { + // Note to future visitors of this code. RocksDb reports times in nanoseconds in this + // API compared to microseconds in Statistics/Properties. Use n_to_s() to convert to + // standard prometheus unit (second). + let context = context.borrow(); + let v = context.metric(PerfMetric::BlockReadCount); + if v != 0 { + counter!(BLOCK_READ_COUNT, + OP_TYPE => self.kind.as_static_str(), + ) + .increment(v); + } + let v = context.metric(PerfMetric::BlockReadByte); + if v != 0 { + counter!(BLOCK_READ_BYTES, + OP_TYPE => self.kind.as_static_str(), + ) + .increment(v) + }; + + let v = context.metric(PerfMetric::WriteWalTime); + if v != 0 { + histogram!(WRITE_WAL_DURATION, + OP_TYPE => self.kind.as_static_str(), + ) + .record(n_to_s(v)); + } + let v = context.metric(PerfMetric::WriteMemtableTime); + if v != 0 { + histogram!(WRITE_MEMTABLE_DURATION, + OP_TYPE => self.kind.as_static_str(), + ) + .record(n_to_s(v)); + } + + let v = context.metric(PerfMetric::WritePreAndPostProcessTime); + if v != 0 { + histogram!(WRITE_PRE_AND_POST_DURATION, + OP_TYPE => self.kind.as_static_str(), + ) + .record(n_to_s(v)); + } + + let v = context.metric(PerfMetric::WriteDelayTime); + if v != 0 { + histogram!(WRITE_ARTIFICIAL_DELAY_DURATION, + OP_TYPE => self.kind.as_static_str(), + ) + .record(n_to_s(v)); + } + }); + } +} + +#[inline] +/// nanos to seconds +fn n_to_s(v: u64) -> f64 { + // Prometheus recommends base units, so we convert nanos to seconds + // (fractions) when convenient. + // See https://prometheus.io/docs/practices/naming/ + v as f64 / 1_000_000_000.0 +} diff --git a/crates/rocksdb/src/rock_access.rs b/crates/rocksdb/src/rock_access.rs index 295d1bdc9..90cd25b0e 100644 --- a/crates/rocksdb/src/rock_access.rs +++ b/crates/rocksdb/src/rock_access.rs @@ -16,6 +16,8 @@ use rocksdb::ColumnFamilyDescriptor; use rocksdb::MultiThreaded; use tracing::trace; +use crate::background::StorageTaskKind; +use crate::perf::RocksDbPerfGuard; use crate::BoxedCfMatcher; use crate::BoxedCfOptionUpdater; use crate::CfName; @@ -161,6 +163,7 @@ impl RocksAccess for rocksdb::DB { } fn flush_memtables(&self, cfs: &[CfName], wait: bool) -> Result<(), RocksError> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::FlushMemtables); let mut flushopts = rocksdb::FlushOptions::default(); flushopts.set_wait(wait); let cfs = cfs @@ -173,6 +176,7 @@ impl RocksAccess for rocksdb::DB { } fn flush_wal(&self, sync: bool) -> Result<(), RocksError> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::FlushWal); Ok(self.flush_wal(sync)?) } @@ -207,6 +211,7 @@ impl RocksAccess for rocksdb::DB { batch: &rocksdb::WriteBatch, write_options: &rocksdb::WriteOptions, ) -> Result<(), rocksdb::Error> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::WriteBatch); self.write_opt(batch, write_options) } @@ -277,6 +282,7 @@ impl RocksAccess for rocksdb::OptimisticTransactionDB { } fn flush_memtables(&self, cfs: &[CfName], wait: bool) -> Result<(), RocksError> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::FlushMemtables); let mut flushopts = rocksdb::FlushOptions::default(); flushopts.set_wait(wait); let cfs = cfs @@ -289,6 +295,7 @@ impl RocksAccess for rocksdb::OptimisticTransactionDB { } fn flush_wal(&self, sync: bool) -> Result<(), RocksError> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::FlushWal); Ok(self.flush_wal(sync)?) } @@ -331,6 +338,7 @@ impl RocksAccess for rocksdb::OptimisticTransactionDB { batch: &rocksdb::WriteBatchWithTransaction, write_options: &rocksdb::WriteOptions, ) -> Result<(), rocksdb::Error> { + let _x = RocksDbPerfGuard::new(StorageTaskKind::WriteBatch); self.write_opt(batch, write_options) } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 6d2a4e24b..998302bb2 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -18,7 +18,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use restate_serde_util::{ByteCount, NonZeroByteCount}; +use restate_serde_util::NonZeroByteCount; use crate::net::{AdvertisedAddress, BindAddress}; use crate::nodes_config::Role; @@ -170,15 +170,13 @@ pub struct CommonOptions { #[cfg_attr(feature = "schemars", schemars(with = "NonZeroByteCount"))] pub rocksdb_total_memory_size: NonZeroUsize, - /// # Rocksdb total memtable size limit + /// # Rocksdb total memtable size ratio /// - /// The memory size used across all memtables. This limits how much memory - /// memtables can eat up from the value in rocksdb_total_memory_limit. When - /// set to 0, memtables can take all available memory up to the value specified - /// in rocksdb_total_memory_limit. - #[serde_as(as = "ByteCount")] - #[cfg_attr(feature = "schemars", schemars(with = "ByteCount"))] - pub rocksdb_total_memtables_size: usize, + /// The memory size used across all memtables (ratio between 0 to 1.0). This + /// limits how much memory memtables can eat up from the value in rocksdb-total-memory-limit. + /// When set to 0, memtables can take all available memory up to the value specified + /// in rocksdb-total-memory-limit. This value will be sanitized to 1.0 if outside the valid bounds. + rocksdb_total_memtables_ratio: f32, /// # Rocksdb Background Threads /// @@ -192,6 +190,16 @@ pub struct CommonOptions { /// The number of threads to reserve to high priority Rocksdb background tasks. pub rocksdb_high_priority_bg_threads: NonZeroU32, + /// # Rocksdb stall detection threshold + /// + /// This defines the duration afterwhich a write is to be considered in "stall" state. For + /// every write that meets this threshold, the system will increment the + /// `restate.rocksdb_stall_flare` gauge, if the write is unstalled, the guage will be updated + /// accordingly. + #[serde(with = "serde_with::As::")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub rocksdb_write_stall_threshold: humantime::Duration, + /// RocksDb base settings and memory limits that get applied on every database #[serde(flatten)] pub rocksdb: RocksDbOptions, @@ -225,6 +233,12 @@ impl CommonOptions { }) } + pub fn rocksdb_total_memtables_size(&self) -> usize { + let sanitized = self.rocksdb_total_memtables_ratio.clamp(0.0, 1.0) as f64; + let total_mem = self.rocksdb_total_memory_size.get() as f64; + (total_mem * sanitized) as usize + } + pub fn storage_high_priority_bg_threads(&self) -> NonZeroUsize { self.storage_high_priority_bg_threads.unwrap_or( std::thread::available_parallelism() @@ -281,10 +295,11 @@ impl Default for CommonOptions { default_thread_pool_size: None, storage_high_priority_bg_threads: None, storage_low_priority_bg_threads: None, - rocksdb_total_memtables_size: 2_000_000_000, // 2GB (50% of total memory) + rocksdb_total_memtables_ratio: 0.5, // (50% of rocksdb-total-memory-size) rocksdb_total_memory_size: NonZeroUsize::new(4_000_000_000).unwrap(), // 4GB rocksdb_bg_threads: None, rocksdb_high_priority_bg_threads: NonZeroU32::new(2).unwrap(), + rocksdb_write_stall_threshold: std::time::Duration::from_secs(3).into(), rocksdb: Default::default(), } } diff --git a/crates/types/src/config/rocksdb.rs b/crates/types/src/config/rocksdb.rs index 98bf8d2f6..094c792b3 100644 --- a/crates/types/src/config/rocksdb.rs +++ b/crates/types/src/config/rocksdb.rs @@ -15,6 +15,8 @@ use serde_with::serde_as; use restate_serde_util::NonZeroByteCount; +use super::{CommonOptions, WorkerOptions}; + #[serde_as] #[derive(Debug, Clone, Default, Serialize, Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] @@ -108,8 +110,21 @@ impl RocksDbOptions { } pub fn rocksdb_write_buffer_size(&self) -> NonZeroUsize { - self.rocksdb_write_buffer_size - .unwrap_or(NonZeroUsize::new(50_000_000).unwrap()) // 50MB + // Default value is calculated from other defaults of the system + self.rocksdb_write_buffer_size.unwrap_or_else(|| { + // NOTE: This is a guess, based on the default values of the system, it doesn't reflect + // the actual configuration because the number of partitions can change over time. The + // goal here is to provide a reasonable default value for the _default_ system + // configuration. + let common_opts = CommonOptions::default(); + let all_memtables = common_opts.rocksdb_total_memtables_size(); + let num_partitions = WorkerOptions::default().bootstrap_num_partitions(); + // Assuming 1 active and 2 immutable memtables per partition + // Assuming 256MB for bifrost's data cf (2 memtables * 128MB default write buffer size) + // Assuming 128MB for bifrost's metadata cf (2 memtables * 128MB default write buffer size) + let buffer_size = (all_memtables - 384_000_000) / (num_partitions * 3) as usize; + NonZeroUsize::new(buffer_size).unwrap() + }) } pub fn rocksdb_max_total_wal_size(&self) -> NonZeroUsize {