Skip to content

Commit

Permalink
feat: initial twcs impl (#1851)
Browse files Browse the repository at this point in the history
* feat: initial twcs impl

* chore: rename SimplePicker to LeveledPicker

* rename some structs

* Remove Compaction strategy

* make compaction picker a trait object

* make compaction picker configurable for every region

* chore: add some test for ttl

* add some tests

* fix: some style issues in cr

* feat: enable twcs when creating tables

* feat: allow config time window when creating tables

* fix: some cr comments
  • Loading branch information
v0y4g3r committed Jul 4, 2023
1 parent b8e9229 commit 3b6f70c
Show file tree
Hide file tree
Showing 21 changed files with 1,149 additions and 452 deletions.
49 changes: 48 additions & 1 deletion src/common/time/src/timestamp_millis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::cmp::Ordering;

use crate::util::div_ceil;
use crate::Timestamp;

/// Unix timestamp in millisecond resolution.
Expand Down Expand Up @@ -80,11 +81,17 @@ impl PartialOrd<TimestampMillis> for i64 {
}

pub trait BucketAligned: Sized {
/// Returns the timestamp aligned by `bucket_duration` or `None` if underflow occurred.
/// Aligns the value by `bucket_duration` or `None` if underflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_by_bucket(self, bucket_duration: i64) -> Option<Self>;

/// Aligns the value by `bucket_duration` to ceil or `None` if overflow occurred.
///
/// # Panics
/// Panics if `bucket_duration <= 0`.
fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self>;
}

impl BucketAligned for i64 {
Expand All @@ -93,6 +100,11 @@ impl BucketAligned for i64 {
self.checked_div_euclid(bucket_duration)
.and_then(|val| val.checked_mul(bucket_duration))
}

fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
div_ceil(self, bucket_duration).checked_mul(bucket_duration)
}
}

impl BucketAligned for Timestamp {
Expand All @@ -103,6 +115,14 @@ impl BucketAligned for Timestamp {
.align_by_bucket(bucket_duration)
.map(|val| Timestamp::new(val, unit))
}

fn align_to_ceil_by_bucket(self, bucket_duration: i64) -> Option<Self> {
assert!(bucket_duration > 0, "{}", bucket_duration);
let unit = self.unit();
self.value()
.align_to_ceil_by_bucket(bucket_duration)
.map(|val| Timestamp::new(val, unit))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -180,4 +200,31 @@ mod tests {
Timestamp::new_millisecond(i64::MIN).align_by_bucket(bucket)
);
}

#[test]
fn test_align_to_ceil() {
assert_eq!(None, i64::MAX.align_to_ceil_by_bucket(10));
assert_eq!(
Some(i64::MAX - (i64::MAX % 10)),
(i64::MAX - (i64::MAX % 10)).align_to_ceil_by_bucket(10)
);
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1));
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(1));
assert_eq!(Some(i64::MAX), i64::MAX.align_to_ceil_by_bucket(i64::MAX));

assert_eq!(
Some(i64::MIN - (i64::MIN % 10)),
i64::MIN.align_to_ceil_by_bucket(10)
);
assert_eq!(Some(i64::MIN), i64::MIN.align_to_ceil_by_bucket(1));

assert_eq!(Some(3), 1i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(3), 3i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(6), 4i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(0), 0i64.align_to_ceil_by_bucket(3));
assert_eq!(Some(0), (-1i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(0), (-2i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(-3), (-3i64).align_to_ceil_by_bucket(3));
assert_eq!(Some(-3), (-4i64).align_to_ceil_by_bucket(3));
}
}
5 changes: 2 additions & 3 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use session::context::QueryContext;
use snafu::prelude::*;
use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker};
use storage::compaction::{CompactionHandler, CompactionSchedulerRef};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
Expand Down Expand Up @@ -395,9 +395,8 @@ impl Instance {
}

fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = SchedulerConfig::from(opts);
let handler = CompactionHandler { picker };
let handler = CompactionHandler::default();
let scheduler = LocalScheduler::new(config, handler);
Arc::new(scheduler)
}
Expand Down
8 changes: 6 additions & 2 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use storage::manifest::manifest_compress_type;
use store_api::storage::{
CloseOptions, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder,
ColumnId, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor,
RowKeyDescriptorBuilder, StorageEngine,
ColumnId, CompactionStrategy, EngineContext as StorageEngineContext, OpenOptions, RegionNumber,
RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
};
use table::engine::{
region_name, table_dir, CloseTableResult, EngineContext, TableEngine, TableEngineProcedure,
Expand Down Expand Up @@ -417,6 +417,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.await.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)? else { return Ok(None) };

let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options);
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size: table_info
Expand All @@ -425,6 +426,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_strategy,
};

debug!(
Expand Down Expand Up @@ -501,6 +503,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table: name,
};

let compaction_strategy = CompactionStrategy::from(&table_info.meta.options.extra_options);
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size: table_info
Expand All @@ -509,6 +512,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_strategy,
};

// TODO(weny): Returns an error earlier if the target region does not exist in the meta.
Expand Down
7 changes: 5 additions & 2 deletions src/mito/src/engine/procedure/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use datatypes::schema::{Schema, SchemaRef};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{
ColumnId, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionNumber,
StorageEngine,
ColumnId, CompactionStrategy, CreateOptions, EngineContext, OpenOptions,
RegionDescriptorBuilder, RegionNumber, StorageEngine,
};
use table::engine::{region_id, table_dir};
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
Expand Down Expand Up @@ -232,15 +232,18 @@ impl<S: StorageEngine> TableCreator<S> {
let table_options = &self.data.request.table_options;
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
let ttl = table_options.ttl;
let compaction_strategy = CompactionStrategy::from(&table_options.extra_options);
let open_opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_strategy: compaction_strategy.clone(),
};
let create_opts = CreateOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_strategy,
};

let primary_key_indices = &self.data.request.primary_key_indices;
Expand Down
169 changes: 167 additions & 2 deletions src/storage/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,182 @@
pub mod noop;
mod picker;
mod scheduler;
mod strategy;
mod task;
mod twcs;
mod writer;

use std::sync::Arc;

pub use picker::{Picker, PickerContext, SimplePicker};
use common_telemetry::tracing::log::warn;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
pub use picker::{LeveledTimeWindowPicker, Picker, PickerContext};
pub use scheduler::{CompactionHandler, CompactionRequestImpl};
use store_api::logstore::LogStore;
use store_api::storage::CompactionStrategy;
pub use task::{CompactionTask, CompactionTaskImpl};
pub use twcs::TwcsPicker;

use crate::scheduler::Scheduler;
use crate::sst::FileHandle;

pub type CompactionPickerRef<S> =
Arc<dyn Picker<Request = CompactionRequestImpl<S>, Task = CompactionTaskImpl<S>> + Send + Sync>;

pub type CompactionSchedulerRef<S> =
Arc<dyn Scheduler<Request = CompactionRequestImpl<S>> + Send + Sync>;

/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);

for f in files {
if let Some((start, end)) = f.time_range() {
min_ts = min_ts.min(*start);
max_ts = max_ts.max(*end);
} else {
// we don't expect an SST file without time range,
// it's either a bug or data corruption.
warn!("Found SST file without time range metadata: {f:?}");
}
}

// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();

max_sec
.checked_sub(min_sec)
.map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}

pub(crate) struct TimeBuckets([i64; 7]);

impl TimeBuckets {
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(&self, span_sec: i64) -> i64 {
assert!(span_sec >= 0);
match self.0.binary_search(&span_sec) {
Ok(idx) => self.0[idx],
Err(idx) => {
if idx < self.0.len() {
self.0[idx]
} else {
self.0.last().copied().unwrap()
}
}
}
}

#[cfg(test)]
fn get(&self, idx: usize) -> i64 {
self.0[idx]
}

fn max(&self) -> i64 {
self.0.last().copied().unwrap()
}
}

/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
]);

pub fn compaction_strategy_to_picker<S: LogStore>(
strategy: &CompactionStrategy,
) -> CompactionPickerRef<S> {
match strategy {
CompactionStrategy::LeveledTimeWindow => {
Arc::new(LeveledTimeWindowPicker::default()) as Arc<_>
}
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
)) as Arc<_>,
}
}

#[cfg(test)]
mod tests {
use common_time::Timestamp;

use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::{FileHandle, FileId, FileMeta, Level};

/// Test util to create file handles.
pub fn new_file_handle(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_id,
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
)),
level,
file_size: 0,
},
layer,
file_purger,
)
}

#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
assert_eq!(
TIME_BUCKETS.get(1),
TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
);

assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
);
assert_eq!(
TIME_BUCKETS.get(3),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
);
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
}

#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS.get(0),
infer_time_bucket(
[
new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
new_file_handle(FileId::random(), 1, 10_000, 0)
]
.iter()
)
);
}
}
Loading

0 comments on commit 3b6f70c

Please sign in to comment.