From 3b6f70cde3d7c1b71880d4821a23a99d26e2c34b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 4 Jul 2023 16:42:27 +0800 Subject: [PATCH] feat: initial twcs impl (#1851) * feat: initial twcs impl * chore: rename SimplePicker to LeveledPicker * rename some structs * Remove Compaction strategy * make compaction picker a trait object * make compaction picker configurable for every region * chore: add some test for ttl * add some tests * fix: some style issues in cr * feat: enable twcs when creating tables * feat: allow config time window when creating tables * fix: some cr comments --- src/common/time/src/timestamp_millis.rs | 49 ++- src/datanode/src/instance.rs | 5 +- src/mito/src/engine.rs | 8 +- src/mito/src/engine/procedure/create.rs | 7 +- src/storage/src/compaction.rs | 169 +++++++++- src/storage/src/compaction/picker.rs | 362 ++++++++++++++++++--- src/storage/src/compaction/scheduler.rs | 39 ++- src/storage/src/compaction/strategy.rs | 327 ------------------- src/storage/src/compaction/task.rs | 26 +- src/storage/src/compaction/twcs.rs | 398 +++++++++++++++++++++++ src/storage/src/compaction/writer.rs | 72 ++-- src/storage/src/engine.rs | 8 +- src/storage/src/flush/scheduler.rs | 4 +- src/storage/src/region.rs | 19 +- src/storage/src/region/tests.rs | 1 + src/storage/src/region/tests/compact.rs | 16 +- src/storage/src/region/writer.rs | 5 +- src/storage/src/test_util/config_util.rs | 1 + src/store-api/src/lib.rs | 1 + src/store-api/src/storage.rs | 5 +- src/store-api/src/storage/engine.rs | 79 +++++ 21 files changed, 1149 insertions(+), 452 deletions(-) delete mode 100644 src/storage/src/compaction/strategy.rs create mode 100644 src/storage/src/compaction/twcs.rs diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs index c5f55ffbf4c7..f640e148827a 100644 --- a/src/common/time/src/timestamp_millis.rs +++ b/src/common/time/src/timestamp_millis.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; +use crate::util::div_ceil; use crate::Timestamp; /// Unix timestamp in millisecond resolution. @@ -80,11 +81,17 @@ impl PartialOrd for i64 { } pub trait BucketAligned: Sized { - /// Returns the timestamp aligned by `bucket_duration` or `None` if underflow occurred. + /// Aligns the value by `bucket_duration` or `None` if underflow occurred. /// /// # Panics /// Panics if `bucket_duration <= 0`. fn align_by_bucket(self, bucket_duration: i64) -> Option; + + /// Aligns the value by `bucket_duration` to ceil or `None` if overflow occurred. + /// + /// # Panics + /// Panics if `bucket_duration <= 0`. + fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option; } impl BucketAligned for i64 { @@ -93,6 +100,11 @@ impl BucketAligned for i64 { self.checked_div_euclid(bucket_duration) .and_then(|val| val.checked_mul(bucket_duration)) } + + fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option { + assert!(bucket_duration > 0, "{}", bucket_duration); + div_ceil(self, bucket_duration).checked_mul(bucket_duration) + } } impl BucketAligned for Timestamp { @@ -103,6 +115,14 @@ impl BucketAligned for Timestamp { .align_by_bucket(bucket_duration) .map(|val| Timestamp::new(val, unit)) } + + fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option { + assert!(bucket_duration > 0, "{}", bucket_duration); + let unit = self.unit(); + self.value() + .align_to_ceil_by_bucket(bucket_duration) + .map(|val| Timestamp::new(val, unit)) + } } #[cfg(test)] @@ -180,4 +200,31 @@ mod tests { Timestamp::new_millisecond(i64::MIN).align_by_bucket(bucket) ); } + + #[test] + fn test_align_to_ceil() { + assert_eq!(None, i64::MAX.align_to_ceil_by_bucket(10)); + assert_eq!( + Some(i64::MAX - (i64::MAX % 10)), + (i64::MAX - (i64::MAX % 10)).align_to_ceil_by_bucket(10) + ); + assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1)); + assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1)); + assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(i64::MAX)); + + assert_eq!( + Some(i64::MIN - (i64::MIN % 10)), + i64::MIN.align_to_ceil_by_bucket(10) + ); + assert_eq!(Some(i64::MIN), i64::MIN.align_to_ceil_by_bucket(1)); + + assert_eq!(Some(3), 1i64.align_to_ceil_by_bucket(3)); + assert_eq!(Some(3), 3i64.align_to_ceil_by_bucket(3)); + assert_eq!(Some(6), 4i64.align_to_ceil_by_bucket(3)); + assert_eq!(Some(0), 0i64.align_to_ceil_by_bucket(3)); + assert_eq!(Some(0), (-1i64).align_to_ceil_by_bucket(3)); + assert_eq!(Some(0), (-2i64).align_to_ceil_by_bucket(3)); + assert_eq!(Some(-3), (-3i64).align_to_ceil_by_bucket(3)); + assert_eq!(Some(-3), (-4i64).align_to_ceil_by_bucket(3)); + } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 148f4fb802f5..78b452cd07eb 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -44,7 +44,7 @@ use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use servers::Mode; use session::context::QueryContext; use snafu::prelude::*; -use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker}; +use storage::compaction::{CompactionHandler, CompactionSchedulerRef}; use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; @@ -395,9 +395,8 @@ impl Instance { } fn create_compaction_scheduler(opts: &DatanodeOptions) -> CompactionSchedulerRef { - let picker = SimplePicker::default(); let config = SchedulerConfig::from(opts); - let handler = CompactionHandler { picker }; + let handler = CompactionHandler::default(); let scheduler = LocalScheduler::new(config, handler); Arc::new(scheduler) } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index dd7949692dbb..519848615919 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -33,8 +33,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use storage::manifest::manifest_compress_type; use store_api::storage::{ CloseOptions, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, - ColumnId, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor, - RowKeyDescriptorBuilder, StorageEngine, + ColumnId, CompactionStrategy, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, + RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; use table::engine::{ region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure, @@ -417,6 +417,7 @@ impl MitoEngineInner { .await.map_err(BoxedError::new) .context(table_error::TableOperationSnafu)? else { return Ok(None) }; + let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options); let opts = OpenOptions { parent_dir: table_dir.to_string(), write_buffer_size: table_info @@ -425,6 +426,7 @@ impl MitoEngineInner { .write_buffer_size .map(|s| s.0 as usize), ttl: table_info.meta.options.ttl, + compaction_strategy, }; debug!( @@ -501,6 +503,7 @@ impl MitoEngineInner { table: name, }; + let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options); let opts = OpenOptions { parent_dir: table_dir.to_string(), write_buffer_size: table_info @@ -509,6 +512,7 @@ impl MitoEngineInner { .write_buffer_size .map(|s| s.0 as usize), ttl: table_info.meta.options.ttl, + compaction_strategy, }; // TODO(weny): Returns an error earlier if the target region does not exist in the meta. diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index bafd0a763136..1b3e959c2870 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -24,8 +24,8 @@ use datatypes::schema::{Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::storage::{ - ColumnId, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionNumber, - StorageEngine, + ColumnId, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, + RegionDescriptorBuilder, RegionNumber, StorageEngine, }; use table::engine::{region_id, table_dir}; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; @@ -232,15 +232,18 @@ impl TableCreator { let table_options = &self.data.request.table_options; let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize); let ttl = table_options.ttl; + let compaction_strategy = CompactionStrategy::from(&table_options.extra_options); let open_opts = OpenOptions { parent_dir: table_dir.to_string(), write_buffer_size, ttl, + compaction_strategy: compaction_strategy.clone(), }; let create_opts = CreateOptions { parent_dir: table_dir.to_string(), write_buffer_size, ttl, + compaction_strategy, }; let primary_key_indices = &self.data.request.primary_key_indices; diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index b8046e49d354..72152dfde256 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -15,17 +15,182 @@ pub mod noop; mod picker; mod scheduler; -mod strategy; mod task; +mod twcs; mod writer; use std::sync::Arc; -pub use picker::{Picker, PickerContext, SimplePicker}; +use common_telemetry::tracing::log::warn; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext}; pub use scheduler::{CompactionHandler, CompactionRequestImpl}; +use store_api::logstore::LogStore; +use store_api::storage::CompactionStrategy; pub use task::{CompactionTask, CompactionTaskImpl}; +pub use twcs::TwcsPicker; use crate::scheduler::Scheduler; +use crate::sst::FileHandle; + +pub type CompactionPickerRef = + Arc, Task = CompactionTaskImpl> + Send + Sync>; pub type CompactionSchedulerRef = Arc> + Send + Sync>; + +/// Infers the suitable time bucket duration. +/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span +/// into time bucket. +pub(crate) fn infer_time_bucket<'a>(files: impl Iterator) -> i64 { + let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second); + let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second); + + for f in files { + if let Some((start, end)) = f.time_range() { + min_ts = min_ts.min(*start); + max_ts = max_ts.max(*end); + } else { + // we don't expect an SST file without time range, + // it's either a bug or data corruption. + warn!("Found SST file without time range metadata: {f:?}"); + } + } + + // safety: Convert whatever timestamp into seconds will not cause overflow. + let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); + let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); + + max_sec + .checked_sub(min_sec) + .map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow. + .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty. +} + +pub(crate) struct TimeBuckets([i64; 7]); + +impl TimeBuckets { + /// Fits a given time span into time bucket by find the minimum bucket that can cover the span. + /// Returns the max bucket if no such bucket can be found. + fn fit_time_bucket(&self, span_sec: i64) -> i64 { + assert!(span_sec >= 0); + match self.0.binary_search(&span_sec) { + Ok(idx) => self.0[idx], + Err(idx) => { + if idx < self.0.len() { + self.0[idx] + } else { + self.0.last().copied().unwrap() + } + } + } + } + + #[cfg(test)] + fn get(&self, idx: usize) -> i64 { + self.0[idx] + } + + fn max(&self) -> i64 { + self.0.last().copied().unwrap() + } +} + +/// A set of predefined time buckets. +pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([ + 60 * 60, // one hour + 2 * 60 * 60, // two hours + 12 * 60 * 60, // twelve hours + 24 * 60 * 60, // one day + 7 * 24 * 60 * 60, // one week + 365 * 24 * 60 * 60, // one year + 10 * 365 * 24 * 60 * 60, // ten years +]); + +pub fn compaction_strategy_to_picker( + strategy: &CompactionStrategy, +) -> CompactionPickerRef { + match strategy { + CompactionStrategy::LeveledTimeWindow => { + Arc::new(LeveledTimeWindowPicker::default()) as Arc<_> + } + CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( + twcs_opts.max_active_window_files, + twcs_opts.max_inactive_window_files, + twcs_opts.time_window_seconds, + )) as Arc<_>, + } +} + +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::*; + use crate::file_purger::noop::new_noop_file_purger; + use crate::sst::{FileHandle, FileId, FileMeta, Level}; + + /// Test util to create file handles. + pub fn new_file_handle( + file_id: FileId, + start_ts_millis: i64, + end_ts_millis: i64, + level: Level, + ) -> FileHandle { + let file_purger = new_noop_file_purger(); + let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); + FileHandle::new( + FileMeta { + region_id: 0, + file_id, + time_range: Some(( + Timestamp::new_millisecond(start_ts_millis), + Timestamp::new_millisecond(end_ts_millis), + )), + level, + file_size: 0, + }, + layer, + file_purger, + ) + } + + #[test] + fn test_time_bucket() { + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1)); + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60)); + assert_eq!( + TIME_BUCKETS.get(1), + TIME_BUCKETS.fit_time_bucket(60 * 60 + 1) + ); + + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1) + ); + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2)) + ); + assert_eq!( + TIME_BUCKETS.get(3), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1) + ); + assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX)); + } + + #[test] + fn test_infer_time_buckets() { + assert_eq!( + TIME_BUCKETS.get(0), + infer_time_bucket( + [ + new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0), + new_file_handle(FileId::random(), 1, 10_000, 0) + ] + .iter() + ) + ); + } +} diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 06d15ab38c45..957d012473e3 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -12,30 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; -use std::sync::Arc; use std::time::Duration; +use common_telemetry::tracing::log::warn; use common_telemetry::{debug, error, info}; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; use snafu::ResultExt; use store_api::logstore::LogStore; +use crate::compaction::infer_time_bucket; use crate::compaction::scheduler::CompactionRequestImpl; -use crate::compaction::strategy::{SimpleTimeWindowStrategy, StrategyRef}; -use crate::compaction::task::{CompactionTask, CompactionTaskImpl}; -use crate::error::TtlCalculationSnafu; +use crate::compaction::task::{CompactionOutput, CompactionTask, CompactionTaskImpl}; +use crate::error::{Result, TtlCalculationSnafu}; use crate::scheduler::Request; -use crate::sst::{FileHandle, Level}; -use crate::version::LevelMetasRef; +use crate::sst::{FileHandle, LevelMeta}; /// Picker picks input SST files and builds the compaction task. /// Different compaction strategy may implement different pickers. -pub trait Picker: Send + 'static { +pub trait Picker: Debug + Send + 'static { type Request: Request; type Task: CompactionTask; - fn pick(&self, req: &Self::Request) -> crate::error::Result>; + fn pick(&self, req: &Self::Request) -> Result>; +} + +pub(crate) fn get_expired_ssts( + levels: &[LevelMeta], + ttl: Option, + now: Timestamp, +) -> Result> { + let Some(ttl) = ttl else { return Ok(vec![]); }; + + let expire_time = now.sub_duration(ttl).context(TtlCalculationSnafu)?; + + let expired_ssts = levels + .iter() + .flat_map(|l| l.get_expired_files(&expire_time).into_iter()) + .collect(); + Ok(expired_ssts) } pub struct PickerContext { @@ -54,56 +73,40 @@ impl PickerContext { } } -/// L0 -> L1 compaction based on time windows. -pub struct SimplePicker { - strategy: StrategyRef, +/// `LeveledTimeWindowPicker` only handles level 0 to level 1 compaction in a time-window tiered +/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned +/// by a inferred time bucket in level 1. +pub struct LeveledTimeWindowPicker { _phantom_data: PhantomData, } -impl Default for SimplePicker { +impl Debug for LeveledTimeWindowPicker { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "LeveledTimeWindowPicker{{..}}") + } +} + +impl Default for LeveledTimeWindowPicker { fn default() -> Self { - Self::new(Arc::new(SimpleTimeWindowStrategy {})) + Self::new() } } -impl SimplePicker { - pub fn new(strategy: StrategyRef) -> Self { +impl LeveledTimeWindowPicker { + pub fn new() -> Self { Self { - strategy, _phantom_data: Default::default(), } } - - fn get_expired_ssts( - &self, - levels: &LevelMetasRef, - ttl: Option, - ) -> crate::error::Result> { - let Some(ttl) = ttl else { return Ok(vec![]); }; - - let expire_time = Timestamp::current_millis() - .sub_duration(ttl) - .context(TtlCalculationSnafu)?; - - let mut expired_ssts = vec![]; - for level in 0..levels.level_num() { - expired_ssts.extend(levels.level(level as Level).get_expired_files(&expire_time)); - } - Ok(expired_ssts) - } } -impl Picker for SimplePicker { +impl Picker for LeveledTimeWindowPicker { type Request = CompactionRequestImpl; type Task = CompactionTaskImpl; - fn pick( - &self, - req: &CompactionRequestImpl, - ) -> crate::error::Result>> { + fn pick(&self, req: &CompactionRequestImpl) -> Result>> { let levels = &req.levels(); - let expired_ssts = self - .get_expired_ssts(levels, req.ttl) + let expired_ssts = get_expired_ssts(levels.levels(), req.ttl, Timestamp::current_millis()) .map_err(|e| { error!(e;"Failed to get region expired SST files, region: {}, ttl: {:?}", req.region_id, req.ttl); e @@ -121,12 +124,16 @@ impl Picker for SimplePicker { let ctx = &PickerContext::with(req.compaction_time_window); + let mut outputs = vec![]; for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); - let (compaction_time_window, outputs) = self.strategy.pick(ctx, level); + let compaction_time_window = Self::pick_level(ctx, level, &mut outputs); if outputs.is_empty() { - debug!("No SST file can be compacted at level {}", level_num); + debug!( + "No SST file can be compacted at level {}, path: {:?}", + level_num, req.sst_layer + ); continue; } @@ -151,3 +158,272 @@ impl Picker for SimplePicker { Ok(None) } } + +impl LeveledTimeWindowPicker { + fn pick_level( + ctx: &PickerContext, + level: &LevelMeta, + results: &mut Vec, + ) -> Option { + // SimpleTimeWindowStrategy only handles level 0 to level 1 compaction. + if level.level() != 0 { + return None; + } + let files = find_compactable_files(level); + debug!("Compactable files found: {:?}", files); + if files.is_empty() { + return None; + } + let time_window = ctx.compaction_time_window().unwrap_or_else(|| { + let inferred = infer_time_bucket(files.iter()); + debug!( + "Compaction window is not present, inferring from files: {:?}", + inferred + ); + inferred + }); + let buckets = calculate_time_buckets(time_window, &files); + debug!("File bucket:{}, file groups: {:?}", time_window, buckets); + + results.extend(buckets.into_iter().map(|(bound, files)| CompactionOutput { + output_level: 1, + time_window_bound: bound, + time_window_sec: time_window, + inputs: files, + // strict window is used in simple time window strategy in that rows in one file + // may get compacted to multiple destinations. + strict_window: true, + })); + Some(time_window) + } +} + +/// Finds files that can be compacted in given level. +/// Currently they're files that is not currently under compaction. +#[inline] +fn find_compactable_files(level: &LevelMeta) -> Vec { + level.files().filter(|f| !f.compacting()).cloned().collect() +} + +/// Calculates buckets for files. If file does not contain a time range in metadata, it will be +/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) +/// so that all files without timestamp can be compacted together. +fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap> { + let mut buckets = HashMap::new(); + + for file in files { + if let Some((start, end)) = file.time_range() { + let bounds = file_time_bucket_span( + start.convert_to(TimeUnit::Second).unwrap().value(), + end.convert_to(TimeUnit::Second).unwrap().value(), + bucket_sec, + ); + for bound in bounds { + buckets + .entry(bound) + .or_insert_with(Vec::new) + .push(file.clone()); + } + } else { + warn!("Found corrupted SST without timestamp bounds: {:?}", file); + } + } + buckets +} + +/// Calculates timestamp span between start and end timestamp. +fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec { + assert!(start_sec <= end_sec); + + // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot + // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. + let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + + let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); + while start_aligned < end_aligned { + res.push(start_aligned); + start_aligned += bucket_sec; + } + res.push(end_aligned); + res +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + + use super::*; + use crate::compaction::tests::new_file_handle; + use crate::compaction::TIME_BUCKETS; + use crate::file_purger::noop::new_noop_file_purger; + use crate::sst::{FileId, Level, LevelMetas}; + + #[test] + fn test_time_bucket_span() { + assert_eq!(vec![0], file_time_bucket_span(1, 9, 10)); + + assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10)); + + assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10)); + + assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10)); + } + + #[test] + fn test_time_bucket_span_large() { + assert_eq!( + vec![ + (i64::MAX - 10).align_by_bucket(10).unwrap(), + i64::MAX.align_by_bucket(10).unwrap(), + ], + file_time_bucket_span(i64::MAX - 10, i64::MAX, 10) + ); + + // magic hmmm? + for bucket in 1..100 { + assert_eq!( + vec![ + i64::MIN, + (i64::MIN + bucket).align_by_bucket(bucket).unwrap() + ], + file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket) + ); + } + } + + fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec { + input + .iter() + .map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0)) + .collect() + } + + fn check_bucket_calculation( + bucket_sec: i64, + files: Vec, + expected: &[(i64, &[FileId])], + ) { + let res = calculate_time_buckets(bucket_sec, &files); + + let expected = expected + .iter() + .map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::>())) + .collect::>(); + + for (bucket, file_ids) in expected { + let actual = res + .get(&bucket) + .unwrap() + .iter() + .map(|f| f.file_id()) + .collect(); + assert_eq!( + file_ids, actual, + "bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}", + ); + } + } + + #[test] + fn test_calculate_time_buckets() { + let file_id_a = FileId::random(); + let file_id_b = FileId::random(); + // simple case, files with disjoint + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]), + &[(0, &[file_id_a]), (10, &[file_id_b])], + ); + + // files across buckets + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]), + &[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])], + ); + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 10000)]), + &[(0, &[file_id_a]), (10, &[file_id_a])], + ); + + // file with an large time range + let file_id_array = &[file_id_a]; + let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0))) + .map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _)) + .collect::>(); + check_bucket_calculation( + TIME_BUCKETS.get(0), + new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]), + &expected, + ); + } + + struct TtlTester { + files: Vec<(FileId, i64, i64, Level)>, + ttl: Option, + expired: Vec, + now: Timestamp, + } + + impl TtlTester { + fn check(&self) { + let expected_expired = self + .expired + .iter() + .map(|idx| self.files[*idx].0) + .collect::>(); + let file_purger = new_noop_file_purger(); + let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); + let file_handles = self + .files + .iter() + .map(|(file_id, start_ts, end_ts, level)| { + new_file_handle(*file_id, *start_ts, *end_ts, *level).meta() + }) + .collect::>(); + let levels = LevelMetas::new(layer, file_purger).merge( + file_handles.into_iter(), + vec![].into_iter(), + None, + ); + let expired = get_expired_ssts(levels.levels(), self.ttl, self.now) + .unwrap() + .into_iter() + .map(|f| f.file_id()) + .collect::>(); + assert_eq!(expected_expired, expired); + } + } + + #[test] + fn test_find_expired_ssts() { + TtlTester { + files: vec![ + (FileId::random(), 8000, 9000, 0), + (FileId::random(), 10000, 11000, 0), + (FileId::random(), 8000, 11000, 1), + (FileId::random(), 2000, 3000, 1), + ], + ttl: Some(Duration::from_secs(1)), + expired: vec![3], + now: Timestamp::new_second(10), + } + .check(); + + TtlTester { + files: vec![ + (FileId::random(), 8000, 8999, 0), + (FileId::random(), 10000, 11000, 0), + (FileId::random(), 8000, 11000, 1), + (FileId::random(), 2000, 3000, 1), + ], + ttl: Some(Duration::from_secs(1)), + expired: vec![0, 3], + now: Timestamp::new_second(10), + } + .check(); + } +} diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 35fc6cff964f..1a37bdbcea07 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -22,8 +23,8 @@ use store_api::storage::RegionId; use tokio::sync::oneshot::Sender; use tokio::sync::Notify; -use crate::compaction::picker::Picker; use crate::compaction::task::CompactionTask; +use crate::compaction::CompactionPickerRef; use crate::error::Result; use crate::manifest::region::RegionManifest; use crate::region::{RegionWriterRef, SharedDataRef}; @@ -63,7 +64,7 @@ pub struct CompactionRequestImpl { pub compaction_time_window: Option, /// Compaction result sender. pub sender: Option>>, - + pub picker: CompactionPickerRef, pub sst_write_buffer_size: ReadableSize, } @@ -79,18 +80,40 @@ impl CompactionRequestImpl { } } -pub struct CompactionHandler

{ - pub picker: P, +pub struct CompactionHandler { + _phantom_data: PhantomData, #[cfg(test)] pub pending_tasks: Arc>>>, } +impl Default for CompactionHandler { + fn default() -> Self { + Self { + _phantom_data: Default::default(), + #[cfg(test)] + pending_tasks: Arc::new(Default::default()), + } + } +} + +impl CompactionHandler { + #[cfg(test)] + pub fn new_with_pending_tasks( + tasks: Arc>>>, + ) -> Self { + Self { + _phantom_data: Default::default(), + pending_tasks: tasks, + } + } +} + #[async_trait::async_trait] -impl

Handler for CompactionHandler

+impl Handler for CompactionHandler where - P: Picker + Send + Sync, + S: LogStore, { - type Request = P::Request; + type Request = CompactionRequestImpl; async fn handle_request( &self, @@ -99,7 +122,7 @@ where finish_notifier: Arc, ) -> Result<()> { let region_id = req.key(); - let Some(task) = self.picker.pick(&req)? else { + let Some(task) = req.picker.pick(&req)? else { info!("No file needs compaction in region: {:?}", region_id); req.complete(Ok(())); return Ok(()); diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs deleted file mode 100644 index 0c95051307b2..000000000000 --- a/src/storage/src/compaction/strategy.rs +++ /dev/null @@ -1,327 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use common_telemetry::{debug, warn}; -use common_time::timestamp::TimeUnit; -use common_time::timestamp_millis::BucketAligned; -use common_time::Timestamp; - -use crate::compaction::picker::PickerContext; -use crate::compaction::task::CompactionOutput; -use crate::sst::{FileHandle, LevelMeta}; - -/// Compaction strategy that defines which SSTs need to be compacted at given level. -pub trait Strategy { - fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option, Vec); -} - -pub type StrategyRef = Arc; - -/// SimpleTimeWindowStrategy only handles level 0 to level 1 compaction in a time-window tiered -/// manner. It picks all SSTs in level 0 and writes rows in these SSTs to a new file partitioned -/// by a inferred time bucket in level 1. -pub struct SimpleTimeWindowStrategy {} - -impl Strategy for SimpleTimeWindowStrategy { - fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option, Vec) { - // SimpleTimeWindowStrategy only handles level 0 to level 1 compaction. - if level.level() != 0 { - return (None, vec![]); - } - let files = find_compactable_files(level); - debug!("Compactable files found: {:?}", files); - if files.is_empty() { - return (None, vec![]); - } - let time_window = ctx.compaction_time_window().unwrap_or_else(|| { - let inferred = infer_time_bucket(&files); - debug!( - "Compaction window is not present, inferring from files: {:?}", - inferred - ); - inferred - }); - let buckets = calculate_time_buckets(time_window, &files); - debug!("File bucket:{}, file groups: {:?}", time_window, buckets); - ( - Some(time_window), - buckets - .into_iter() - .map(|(bound, files)| CompactionOutput { - output_level: 1, - bucket_bound: bound, - bucket: time_window, - inputs: files, - }) - .collect(), - ) - } -} - -/// Finds files that can be compacted in given level. -/// Currently they're files that is not currently under compaction. -#[inline] -fn find_compactable_files(level: &LevelMeta) -> Vec { - level.files().filter(|f| !f.compacting()).cloned().collect() -} - -/// Calculates buckets for files. If file does not contain a time range in metadata, it will be -/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) -/// so that all files without timestamp can be compacted together. -fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap> { - let mut buckets = HashMap::new(); - - for file in files { - if let Some((start, end)) = file.time_range() { - let bounds = file_time_bucket_span( - start.convert_to(TimeUnit::Second).unwrap().value(), - end.convert_to(TimeUnit::Second).unwrap().value(), - bucket_sec, - ); - for bound in bounds { - buckets - .entry(bound) - .or_insert_with(Vec::new) - .push(file.clone()); - } - } else { - warn!("Found corrupted SST without timestamp bounds: {:?}", file); - } - } - buckets -} - -/// Calculates timestamp span between start and end timestamp. -fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec { - assert!(start_sec <= end_sec); - - // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot - // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. - let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); - let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); - - let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); - while start_aligned < end_aligned { - res.push(start_aligned); - start_aligned += bucket_sec; - } - res.push(end_aligned); - res -} - -/// Infers the suitable time bucket duration. -/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span -/// into time bucket. -fn infer_time_bucket(files: &[FileHandle]) -> i64 { - let mut max_ts = &Timestamp::new(i64::MIN, TimeUnit::Second); - let mut min_ts = &Timestamp::new(i64::MAX, TimeUnit::Second); - - for f in files { - if let Some((start, end)) = f.time_range() { - min_ts = min_ts.min(start); - max_ts = max_ts.max(end); - } else { - // we don't expect an SST file without time range, - // it's either a bug or data corruption. - warn!("Found SST file without time range metadata: {f:?}"); - } - } - - // safety: Convert whatever timestamp into seconds will not cause overflow. - let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); - let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); - - max_sec - .checked_sub(min_sec) - .map(fit_time_bucket) // return the max bucket on subtraction overflow. - .unwrap_or_else(|| *TIME_BUCKETS.last().unwrap()) // safety: TIME_BUCKETS cannot be empty. -} - -/// A set of predefined time buckets. -const TIME_BUCKETS: [i64; 7] = [ - 60 * 60, // one hour - 2 * 60 * 60, // two hours - 12 * 60 * 60, // twelve hours - 24 * 60 * 60, // one day - 7 * 24 * 60 * 60, // one week - 365 * 24 * 60 * 60, // one year - 10 * 365 * 24 * 60 * 60, // ten years -]; - -/// Fits a given time span into time bucket by find the minimum bucket that can cover the span. -/// Returns the max bucket if no such bucket can be found. -fn fit_time_bucket(span_sec: i64) -> i64 { - assert!(span_sec >= 0); - for b in TIME_BUCKETS { - if b >= span_sec { - return b; - } - } - *TIME_BUCKETS.last().unwrap() -} - -#[cfg(test)] -mod tests { - use std::collections::{HashMap, HashSet}; - - use super::*; - use crate::file_purger::noop::new_noop_file_purger; - use crate::sst::{FileId, FileMeta}; - - #[test] - fn test_time_bucket_span() { - assert_eq!(vec![0], file_time_bucket_span(1, 9, 10)); - - assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10)); - - assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10)); - - assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10)); - } - - #[test] - fn test_time_bucket_span_large() { - assert_eq!( - vec![ - (i64::MAX - 10).align_by_bucket(10).unwrap(), - i64::MAX.align_by_bucket(10).unwrap(), - ], - file_time_bucket_span(i64::MAX - 10, i64::MAX, 10) - ); - - // magic hmmm? - for bucket in 1..100 { - assert_eq!( - vec![ - i64::MIN, - (i64::MIN + bucket).align_by_bucket(bucket).unwrap() - ], - file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket) - ); - } - } - - #[test] - fn test_time_bucket() { - assert_eq!(TIME_BUCKETS[0], fit_time_bucket(1)); - assert_eq!(TIME_BUCKETS[0], fit_time_bucket(60 * 60)); - assert_eq!(TIME_BUCKETS[1], fit_time_bucket(60 * 60 + 1)); - - assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2] - 1)); - assert_eq!(TIME_BUCKETS[2], fit_time_bucket(TIME_BUCKETS[2])); - assert_eq!(TIME_BUCKETS[3], fit_time_bucket(TIME_BUCKETS[3] - 1)); - assert_eq!(TIME_BUCKETS[6], fit_time_bucket(i64::MAX)); - } - - #[test] - fn test_infer_time_buckets() { - assert_eq!( - TIME_BUCKETS[0], - infer_time_bucket(&[ - new_file_handle(FileId::random(), 0, TIME_BUCKETS[0] * 1000 - 1), - new_file_handle(FileId::random(), 1, 10_000) - ]) - ); - } - - fn new_file_handle(file_id: FileId, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle { - let file_purger = new_noop_file_purger(); - let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); - FileHandle::new( - FileMeta { - region_id: 0, - file_id, - time_range: Some(( - Timestamp::new_millisecond(start_ts_millis), - Timestamp::new_millisecond(end_ts_millis), - )), - level: 0, - file_size: 0, - }, - layer, - file_purger, - ) - } - - fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec { - input - .iter() - .map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end)) - .collect() - } - - fn check_bucket_calculation( - bucket_sec: i64, - files: Vec, - expected: &[(i64, &[FileId])], - ) { - let res = calculate_time_buckets(bucket_sec, &files); - - let expected = expected - .iter() - .map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::>())) - .collect::>(); - - for (bucket, file_ids) in expected { - let actual = res - .get(&bucket) - .unwrap() - .iter() - .map(|f| f.file_id()) - .collect(); - assert_eq!( - file_ids, actual, - "bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}", - ); - } - } - - #[test] - fn test_calculate_time_buckets() { - let file_id_a = FileId::random(); - let file_id_b = FileId::random(); - // simple case, files with disjoint - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]), - &[(0, &[file_id_a]), (10, &[file_id_b])], - ); - - // files across buckets - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]), - &[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])], - ); - check_bucket_calculation( - 10, - new_file_handles(&[(file_id_a, 0, 10000)]), - &[(0, &[file_id_a]), (10, &[file_id_a])], - ); - - // file with an large time range - let file_id_array = &[file_id_a]; - let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0])) - .map(|b| (b * TIME_BUCKETS[0], file_id_array as _)) - .collect::>(); - check_bucket_calculation( - TIME_BUCKETS[0], - new_file_handles(&[(file_id_a, 0, TIME_BUCKETS[4] * 1000)]), - &expected, - ); - } -} diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 392d70388616..b7cec2a36df0 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -169,13 +169,15 @@ impl CompactionTask for CompactionTaskImpl { #[derive(Debug)] pub struct CompactionOutput { /// Compaction output file level. - pub(crate) output_level: Level, - /// The left bound of time bucket. - pub(crate) bucket_bound: i64, - /// Bucket duration in seconds. - pub(crate) bucket: i64, + pub output_level: Level, + /// The left bound of time window. + pub time_window_bound: i64, + /// Time window size in seconds. + pub time_window_sec: i64, /// Compaction input files. - pub(crate) inputs: Vec, + pub inputs: Vec, + /// If the compaction output is strictly windowed. + pub strict_window: bool, } impl CompactionOutput { @@ -186,13 +188,21 @@ impl CompactionOutput { sst_layer: AccessLayerRef, sst_write_buffer_size: ReadableSize, ) -> Result> { + let time_range = if self.strict_window { + ( + Some(self.time_window_bound), + Some(self.time_window_bound + self.time_window_sec), + ) + } else { + (None, None) + }; + let reader = build_sst_reader( region_id, schema, sst_layer.clone(), &self.inputs, - self.bucket_bound, - self.bucket_bound + self.bucket, + time_range, ) .await?; diff --git a/src/storage/src/compaction/twcs.rs b/src/storage/src/compaction/twcs.rs new file mode 100644 index 000000000000..1d327d3dca2e --- /dev/null +++ b/src/storage/src/compaction/twcs.rs @@ -0,0 +1,398 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Time-window compaction strategy + +use std::collections::BTreeMap; +use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; + +use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use common_time::Timestamp; +use store_api::logstore::LogStore; + +use crate::compaction::picker::get_expired_ssts; +use crate::compaction::task::CompactionOutput; +use crate::compaction::{infer_time_bucket, CompactionRequestImpl, CompactionTaskImpl, Picker}; +use crate::sst::{FileHandle, LevelMeta}; + +/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction +/// candidates. +pub struct TwcsPicker { + max_active_window_files: usize, + max_inactive_window_files: usize, + time_window_seconds: Option, + _phantom_data: PhantomData, +} + +impl Debug for TwcsPicker { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TwcsPicker") + .field("max_active_window_files", &self.max_active_window_files) + .field("max_inactive_window_files", &self.max_inactive_window_files) + .finish() + } +} + +impl TwcsPicker { + pub fn new( + max_active_window_files: usize, + max_inactive_window_files: usize, + time_window_seconds: Option, + ) -> Self { + Self { + max_inactive_window_files, + max_active_window_files, + _phantom_data: Default::default(), + time_window_seconds, + } + } + + /// Builds compaction output from files. + /// For active writing window, we allow for at most `max_active_window_files` files to alleviate + /// fragmentation. For other windows, we allow at most 1 file at each window. + fn build_output( + &self, + time_windows: &BTreeMap>, + active_window: Option, + window_size: i64, + ) -> Vec { + let mut output = vec![]; + for (window, files) in time_windows { + if let Some(active_window) = active_window && *window == active_window { + if files.len() > self.max_active_window_files { + output.push(CompactionOutput { + output_level: 1, // we only have two levels and always compact to l1 + time_window_bound: *window, + time_window_sec: window_size, + inputs: files.clone(), + // Strict window is not needed since we always compact many files to one + // single file in TWCS. + strict_window: false, + }); + } else { + debug!("Active window not present or no enough files in active window {:?}", active_window); + } + } else { + // not active writing window + if files.len() > self.max_inactive_window_files { + output.push(CompactionOutput { + output_level: 1, + time_window_bound: *window, + time_window_sec: window_size, + inputs: files.clone(), + strict_window: false, + }); + } + } + } + output + } +} + +impl Picker for TwcsPicker { + type Request = CompactionRequestImpl; + type Task = CompactionTaskImpl; + + fn pick(&self, req: &Self::Request) -> crate::error::Result> { + let levels = req.levels(); + let expired_ssts = get_expired_ssts(levels.levels(), req.ttl, Timestamp::current_millis())?; + if !expired_ssts.is_empty() { + info!( + "Expired SSTs in region {}: {:?}", + req.region_id, expired_ssts + ); + // here we mark expired SSTs as compacting to avoid them being picked. + expired_ssts.iter().for_each(|f| f.mark_compacting(true)); + } + + let time_window_size = req + .compaction_time_window + .or(self.time_window_seconds) + .unwrap_or_else(|| { + let inferred = infer_time_bucket(req.levels().level(0).files()); + info!( + "Compaction window for region {} is not present, inferring from files: {:?}", + req.region_id, inferred + ); + inferred + }); + + // Find active window from files in level 0. + let active_window = + find_latest_window_in_seconds(levels.level(0).files(), time_window_size); + + let windows = assign_to_windows( + levels.levels().iter().flat_map(LevelMeta::files), + time_window_size, + ); + + let outputs = self.build_output(&windows, active_window, time_window_size); + let task = CompactionTaskImpl { + schema: req.schema(), + sst_layer: req.sst_layer.clone(), + outputs, + writer: req.writer.clone(), + shared_data: req.shared.clone(), + wal: req.wal.clone(), + manifest: req.manifest.clone(), + expired_ssts, + sst_write_buffer_size: req.sst_write_buffer_size, + compaction_time_window: Some(time_window_size), + }; + Ok(Some(task)) + } +} + +/// Assigns files to windows with predefined window size (in seconds) by their max timestamps. +fn assign_to_windows<'a>( + files: impl Iterator, + time_window_size: i64, +) -> BTreeMap> { + let mut windows: BTreeMap> = BTreeMap::new(); + // Iterates all files and assign to time windows according to max timestamp + for file in files { + if let Some((_, end)) = file.time_range() { + let time_window = end + .convert_to(TimeUnit::Second) + .unwrap() + .value() + .align_to_ceil_by_bucket(time_window_size) + .unwrap_or(i64::MIN); + windows.entry(time_window).or_default().push(file.clone()); + } else { + warn!("Unexpected file w/o timestamp: {:?}", file.file_id()); + } + } + windows +} + +/// Finds the latest active writing window among all files. +/// Returns `None` when there are no files or all files are corrupted. +fn find_latest_window_in_seconds<'a>( + files: impl Iterator, + time_window_size: i64, +) -> Option { + let mut latest_timestamp = None; + for f in files { + if let Some((_, end)) = f.time_range() { + if let Some(latest) = latest_timestamp && end > latest { + latest_timestamp = Some(end); + } else { + latest_timestamp = Some(end); + } + } else { + warn!("Cannot find timestamp range of file: {}", f.file_id()); + } + } + latest_timestamp + .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second)) + .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size)) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use log_store::NoopLogStore; + + use super::*; + use crate::compaction::tests::new_file_handle; + use crate::sst::{FileId, Level}; + + #[test] + fn test_get_latest_window_in_seconds() { + assert_eq!( + Some(1), + find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1) + ); + assert_eq!( + Some(1), + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), 0, 1000, 0)].iter(), + 1 + ) + ); + + assert_eq!( + Some(-9223372036854000), + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(), + 3600, + ) + ); + + assert_eq!( + (i64::MAX / 10000000 + 1) * 10000, + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(), + 10000, + ) + .unwrap() + ); + } + + #[test] + fn test_assign_to_windows() { + let windows = assign_to_windows( + [ + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + ] + .iter(), + 3, + ); + assert_eq!(5, windows.get(&0).unwrap().len()); + + let files = [FileId::random(); 3]; + let windows = assign_to_windows( + [ + new_file_handle(files[0], -2000, -3, 0), + new_file_handle(files[1], 0, 2999, 0), + new_file_handle(files[2], 50, 10001, 0), + ] + .iter(), + 3, + ); + assert_eq!(files[0], windows.get(&0).unwrap().get(0).unwrap().file_id()); + assert_eq!(files[1], windows.get(&3).unwrap().get(0).unwrap().file_id()); + assert_eq!( + files[2], + windows.get(&12).unwrap().get(0).unwrap().file_id() + ); + } + + struct CompactionPickerTestCase { + window_size: i64, + input_files: Vec, + expected_outputs: Vec, + } + + impl CompactionPickerTestCase { + fn check(&self) { + let windows = assign_to_windows(self.input_files.iter(), self.window_size); + let active_window = + find_latest_window_in_seconds(self.input_files.iter(), self.window_size); + let output = TwcsPicker::::new(4, 1, None).build_output( + &windows, + active_window, + self.window_size, + ); + + let output = output + .iter() + .map(|o| { + let input_file_ids = + o.inputs.iter().map(|f| f.file_id()).collect::>(); + ( + input_file_ids, + o.output_level, + o.time_window_sec, + o.time_window_bound, + o.strict_window, + ) + }) + .collect::>(); + + let expected = self + .expected_outputs + .iter() + .map(|o| { + let input_file_ids = o + .input_files + .iter() + .map(|idx| self.input_files[*idx].file_id()) + .collect::>(); + ( + input_file_ids, + o.output_level, + o.time_window_sec, + o.time_window_bound, + o.strict_window, + ) + }) + .collect::>(); + assert_eq!(expected, output); + } + } + + struct ExpectedOutput { + input_files: Vec, + output_level: Level, + time_window_sec: i64, + time_window_bound: i64, + strict_window: bool, + } + + #[test] + fn test_build_twcs_output() { + let file_ids = (0..4).map(|_| FileId::random()).collect::>(); + + CompactionPickerTestCase { + window_size: 3, + input_files: [ + new_file_handle(file_ids[0], -2000, -3, 0), + new_file_handle(file_ids[1], -3000, -100, 0), + new_file_handle(file_ids[2], 0, 2999, 0), //active windows + new_file_handle(file_ids[3], 50, 2998, 0), //active windows + ] + .to_vec(), + expected_outputs: vec![ExpectedOutput { + input_files: vec![0, 1], + output_level: 1, + time_window_sec: 3, + time_window_bound: 0, + strict_window: false, + }], + } + .check(); + + let file_ids = (0..6).map(|_| FileId::random()).collect::>(); + CompactionPickerTestCase { + window_size: 3, + input_files: [ + new_file_handle(file_ids[0], -2000, -3, 0), + new_file_handle(file_ids[1], -3000, -100, 0), + new_file_handle(file_ids[2], 0, 2999, 0), + new_file_handle(file_ids[3], 50, 2998, 0), + new_file_handle(file_ids[4], 11, 2990, 0), + new_file_handle(file_ids[5], 50, 4998, 0), + ] + .to_vec(), + expected_outputs: vec![ + ExpectedOutput { + input_files: vec![0, 1], + output_level: 1, + time_window_sec: 3, + time_window_bound: 0, + strict_window: false, + }, + ExpectedOutput { + input_files: vec![2, 3, 4], + output_level: 1, + time_window_sec: 3, + time_window_bound: 3, + strict_window: false, + }, + ], + } + .check(); + } +} diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 0d62cc621246..8c73eeacce8e 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -29,8 +29,7 @@ pub(crate) async fn build_sst_reader( schema: RegionSchemaRef, sst_layer: AccessLayerRef, files: &[FileHandle], - lower_sec_inclusive: i64, - upper_sec_exclusive: i64, + time_range: (Option, Option), ) -> error::Result { // TODO(hl): Schemas in different SSTs may differ, thus we should infer // timestamp column name from Parquet metadata. @@ -43,14 +42,9 @@ pub(crate) async fn build_sst_reader( ChunkReaderBuilder::new(region_id, schema, sst_layer) .pick_ssts(files) .filters( - build_time_range_filter( - lower_sec_inclusive, - upper_sec_exclusive, - &ts_col_name, - ts_col_unit, - ) - .into_iter() - .collect(), + build_time_range_filter(time_range, &ts_col_name, ts_col_unit) + .into_iter() + .collect(), ) .build() .await @@ -59,21 +53,22 @@ pub(crate) async fn build_sst_reader( /// Build time range filter expr from lower (inclusive) and upper bound(exclusive). /// Returns `None` if time range overflows. fn build_time_range_filter( - low_sec: i64, - high_sec: i64, + time_range: (Option, Option), ts_col_name: &str, ts_col_unit: TimeUnit, ) -> Option { - debug_assert!(low_sec <= high_sec); + let (low_ts_inclusive, high_ts_exclusive) = time_range; let ts_col = DfExpr::Column(datafusion_common::Column::from_name(ts_col_name)); // Converting seconds to whatever unit won't lose precision. // Here only handles overflow. - let low_ts = common_time::Timestamp::new_second(low_sec) - .convert_to(ts_col_unit) + let low_ts = low_ts_inclusive + .map(common_time::Timestamp::new_second) + .and_then(|ts| ts.convert_to(ts_col_unit)) .map(|ts| ts.value()); - let high_ts = common_time::Timestamp::new_second(high_sec) - .convert_to(ts_col_unit) + let high_ts = high_ts_exclusive + .map(common_time::Timestamp::new_second) + .and_then(|ts| ts.convert_to(ts_col_unit)) .map(|ts| ts.value()); let expr = match (low_ts, high_ts) { @@ -296,8 +291,7 @@ mod tests { schema, sst_layer, files, - lower_sec_inclusive, - upper_sec_exclusive, + (Some(lower_sec_inclusive), Some(upper_sec_exclusive)), ) .await .unwrap(); @@ -378,9 +372,15 @@ mod tests { sst_layer: AccessLayerRef, ) -> Vec { let mut timestamps = vec![]; - let mut reader = build_sst_reader(REGION_ID, schema, sst_layer, files, i64::MIN, i64::MAX) - .await - .unwrap(); + let mut reader = build_sst_reader( + REGION_ID, + schema, + sst_layer, + files, + (Some(i64::MIN), Some(i64::MAX)), + ) + .await + .unwrap(); while let Some(chunk) = reader.next_chunk().await.unwrap() { let ts = chunk.columns[0] .as_any() @@ -447,8 +447,7 @@ mod tests { schema.clone(), sst_layer.clone(), &input_files, - 0, - 3, + (Some(0), Some(3)), ) .await .unwrap(); @@ -457,8 +456,7 @@ mod tests { schema.clone(), sst_layer.clone(), &input_files, - 3, - 6, + (Some(3), Some(6)), ) .await .unwrap(); @@ -467,8 +465,7 @@ mod tests { schema.clone(), sst_layer.clone(), &input_files, - 6, - 10, + (Some(6), Some(10)), ) .await .unwrap(); @@ -554,7 +551,12 @@ mod tests { #[test] fn test_build_time_range_filter() { - assert!(build_time_range_filter(i64::MIN, i64::MAX, "ts", TimeUnit::Nanosecond).is_none()); + assert!(build_time_range_filter( + (Some(i64::MIN), Some(i64::MAX)), + "ts", + TimeUnit::Nanosecond + ) + .is_none()); assert_eq!( Expr::from(datafusion_expr::binary_expr( @@ -562,10 +564,10 @@ mod tests { Operator::Lt, datafusion_expr::lit(timestamp_to_scalar_value( TimeUnit::Nanosecond, - Some(TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64) - )) + Some(TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64), + )), )), - build_time_range_filter(i64::MIN, 1, "ts", TimeUnit::Nanosecond).unwrap() + build_time_range_filter((Some(i64::MIN), Some(1)), "ts", TimeUnit::Nanosecond).unwrap() ); assert_eq!( @@ -576,10 +578,10 @@ mod tests { TimeUnit::Nanosecond, Some( 2 * TimeUnit::Second.factor() as i64 / TimeUnit::Nanosecond.factor() as i64 - ) - )) + ), + )), )), - build_time_range_filter(2, i64::MAX, "ts", TimeUnit::Nanosecond).unwrap() + build_time_range_filter((Some(2), Some(i64::MAX)), "ts", TimeUnit::Nanosecond).unwrap() ); } } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 13a261d32e0c..19fe282bdd37 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -23,8 +23,8 @@ use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::Manifest; use store_api::storage::{ - CloseContext, CloseOptions, CreateOptions, EngineContext, OpenOptions, Region, - RegionDescriptor, StorageEngine, + CloseContext, CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, + Region, RegionDescriptor, StorageEngine, }; use crate::compaction::CompactionSchedulerRef; @@ -395,6 +395,7 @@ impl EngineInner { name, &self.config, opts.ttl, + opts.compaction_strategy.clone(), ) .await?; @@ -440,6 +441,7 @@ impl EngineInner { ®ion_name, &self.config, opts.ttl, + opts.compaction_strategy.clone(), ) .await?; @@ -471,6 +473,7 @@ impl EngineInner { region_name: &str, config: &EngineConfig, region_ttl: Option, + compaction_strategy: CompactionStrategy, ) -> Result> { let parent_dir = util::normalize_dir(parent_dir); @@ -503,6 +506,7 @@ impl EngineInner { ttl, write_buffer_size: write_buffer_size .unwrap_or(self.config.region_write_buffer_size.as_bytes() as usize), + compaction_strategy, }) } diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index fa75be05990c..b319b6747799 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -25,7 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::{oneshot, Notify}; -use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; +use crate::compaction::{CompactionPickerRef, CompactionRequestImpl, CompactionSchedulerRef}; use crate::config::EngineConfig; use crate::engine::RegionMap; use crate::error::{ @@ -109,6 +109,7 @@ pub struct FlushRegionRequest { pub ttl: Option, /// Time window for compaction. pub compaction_time_window: Option, + pub compaction_picker: CompactionPickerRef, } impl FlushRegionRequest { @@ -146,6 +147,7 @@ impl From<&FlushRegionRequest> for CompactionRequestImpl { ttl: req.ttl, compaction_time_window: req.compaction_time_window, sender: None, + picker: req.compaction_picker.clone(), sst_write_buffer_size: req.engine_config.sst_write_buffer_size, } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 5a78e82a6558..de75b8e7b9f4 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -32,11 +32,13 @@ use store_api::manifest::{ self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator, }; use store_api::storage::{ - AlterRequest, CloseContext, FlushContext, FlushReason, OpenOptions, ReadContext, Region, - RegionId, SequenceNumber, WriteContext, WriteResponse, + AlterRequest, CloseContext, CompactionStrategy, FlushContext, FlushReason, OpenOptions, + ReadContext, Region, RegionId, SequenceNumber, WriteContext, WriteResponse, }; -use crate::compaction::CompactionSchedulerRef; +use crate::compaction::{ + compaction_strategy_to_picker, CompactionPickerRef, CompactionSchedulerRef, +}; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::file_purger::FilePurgerRef; @@ -164,6 +166,7 @@ pub struct StoreConfig { pub file_purger: FilePurgerRef, pub ttl: Option, pub write_buffer_size: usize, + pub compaction_strategy: CompactionStrategy, } pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -252,6 +255,7 @@ impl RegionImpl { flush_strategy: store_config.flush_strategy, flush_scheduler: store_config.flush_scheduler, compaction_scheduler: store_config.compaction_scheduler, + compaction_picker: compaction_strategy_to_picker(&store_config.compaction_strategy), sst_layer: store_config.sst_layer, manifest: store_config.manifest, }); @@ -336,6 +340,8 @@ impl RegionImpl { store_config.ttl, store_config.write_buffer_size, )); + + let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy); let writer_ctx = WriterContext { shared: &shared, flush_strategy: &store_config.flush_strategy, @@ -345,6 +351,7 @@ impl RegionImpl { wal: &wal, writer: &writer, manifest: &store_config.manifest, + compaction_picker: compaction_picker.clone(), }; // Replay all unflushed data. writer @@ -364,6 +371,7 @@ impl RegionImpl { flush_strategy: store_config.flush_strategy, flush_scheduler: store_config.flush_scheduler, compaction_scheduler: store_config.compaction_scheduler, + compaction_picker, sst_layer: store_config.sst_layer, manifest: store_config.manifest, }); @@ -586,6 +594,7 @@ impl RegionImpl { wal: &inner.wal, writer: &inner.writer, manifest: &inner.manifest, + compaction_picker: inner.compaction_picker.clone(), }; inner.writer.replay(recovered_metadata, writer_ctx).await @@ -642,6 +651,7 @@ struct RegionInner { flush_strategy: FlushStrategyRef, flush_scheduler: FlushSchedulerRef, compaction_scheduler: CompactionSchedulerRef, + compaction_picker: CompactionPickerRef, sst_layer: AccessLayerRef, manifest: RegionManifest, } @@ -685,6 +695,7 @@ impl RegionInner { wal: &self.wal, writer: &self.writer, manifest: &self.manifest, + compaction_picker: self.compaction_picker.clone(), }; // The writer would also try to compat the schema of write batch if it finds out the // schema version of request is less than current schema version. @@ -746,6 +757,7 @@ impl RegionInner { wal: &self.wal, writer: &self.writer, manifest: &self.manifest, + compaction_picker: self.compaction_picker.clone(), }; self.writer.flush(writer_ctx, ctx).await } @@ -761,6 +773,7 @@ impl RegionInner { wal: &self.wal, writer: &self.writer, manifest: &self.manifest, + compaction_picker: self.compaction_picker.clone(), }; self.writer.compact(writer_ctx, ctx).await } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index f31e13339b31..9eeae2f72981 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -559,6 +559,7 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig { pub wal: &'a Wal, pub writer: &'a RegionWriterRef, pub manifest: &'a RegionManifest, + pub compaction_picker: CompactionPickerRef, } impl<'a, S: LogStore> WriterContext<'a, S> { @@ -779,6 +780,7 @@ impl WriterInner { engine_config: self.engine_config.clone(), ttl: self.ttl, compaction_time_window: current_version.ssts().compaction_time_window(), + compaction_picker: ctx.compaction_picker.clone(), }; let flush_handle = ctx @@ -816,6 +818,7 @@ impl WriterInner { ttl: self.ttl, compaction_time_window, sender: None, + picker: writer_ctx.compaction_picker.clone(), sst_write_buffer_size, }; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index c0ec72fc8772..ac94a19bd153 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -125,6 +125,7 @@ pub async fn new_store_config_with_object_store( file_purger, ttl: None, write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize, + compaction_strategy: Default::default(), }, regions, ) diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 133ded64a799..627248defee5 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(let_chains)] // Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 4284358834e5..3e7acf4d39cd 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -32,7 +32,10 @@ pub use datatypes::schema::{ pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; -pub use self::engine::{CloseOptions, CreateOptions, EngineContext, OpenOptions, StorageEngine}; +pub use self::engine::{ + CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, StorageEngine, + TwcsOptions, +}; pub use self::metadata::RegionMeta; pub use self::region::{CloseContext, FlushContext, FlushReason, Region, RegionStat, WriteContext}; pub use self::requests::{ diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 99a99421a1c0..cada401c13ef 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -18,6 +18,7 @@ //! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds //! chunks of rows, support operations like PUT/DELETE/SCAN. +use std::collections::HashMap; use std::time::Duration; use async_trait::async_trait; @@ -26,6 +27,13 @@ use common_error::ext::ErrorExt; use crate::storage::descriptors::RegionDescriptor; use crate::storage::region::Region; +const COMPACTION_STRATEGY_KEY: &str = "compaction"; +const COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE: &str = "LTW"; +const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS"; +const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files"; +const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds"; +const TWCS_MAX_INACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_inactive_window_files"; + /// Storage engine provides primitive operations to store and access data. #[async_trait] pub trait StorageEngine: Send + Sync + Clone + 'static { @@ -92,6 +100,8 @@ pub struct CreateOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, + /// Compaction strategy + pub compaction_strategy: CompactionStrategy, } /// Options to open a region. @@ -103,6 +113,8 @@ pub struct OpenOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, + /// Compaction strategy + pub compaction_strategy: CompactionStrategy, } /// Options to close a region. @@ -111,3 +123,70 @@ pub struct CloseOptions { /// Flush region pub flush: bool, } + +/// Options for compactions +#[derive(Debug, Clone, Default)] +pub enum CompactionStrategy { + /// Leveled time window compaction strategy + #[default] + LeveledTimeWindow, + /// TWCS + Twcs(TwcsOptions), +} + +/// TWCS compaction options. +#[derive(Debug, Clone)] +pub struct TwcsOptions { + /// Max num of files that can be kept in active writing time window. + pub max_active_window_files: usize, + /// Max num of files that can be kept in inactive time window. + pub max_inactive_window_files: usize, + /// Compaction time window defined when creating tables. + pub time_window_seconds: Option, +} + +impl Default for TwcsOptions { + fn default() -> Self { + Self { + max_active_window_files: 4, + max_inactive_window_files: 1, + time_window_seconds: None, + } + } +} + +impl From<&HashMap> for CompactionStrategy { + fn from(opts: &HashMap) -> Self { + let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { return CompactionStrategy::default() }; + if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE) { + CompactionStrategy::LeveledTimeWindow + } else if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) { + let mut twcs_opts = TwcsOptions::default(); + if let Some(max_active_window_files) = opts + .get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY) + .and_then(|num| num.parse::().ok()) + { + twcs_opts.max_active_window_files = max_active_window_files; + } + + if let Some(max_inactive_window_files) = opts + .get(TWCS_MAX_INACTIVE_WINDOW_FILES_KEY) + .and_then(|num| num.parse::().ok()) + { + twcs_opts.max_inactive_window_files = max_inactive_window_files; + } + + if let Some(time_window) = opts + .get(TWCS_TIME_WINDOW_SECONDS_KEY) + .and_then(|num| num.parse::().ok()) && time_window > 0 + { + twcs_opts.time_window_seconds = Some(time_window); + } + + CompactionStrategy::Twcs(twcs_opts) + } else { + // unrecognized compaction strategy + CompactionStrategy::default() + } + } +}