Skip to content

Commit

Permalink
feat(mito): Checks whether a region should flush periodically (#3459)
Browse files Browse the repository at this point in the history
* feat: handle flush periodically

* chore: call periodical method in loop

* feat: check periodical tasks on channel timeout

* refactor: use time provider to get time

Mock a time provider to test auto flush

* chore: fix typos

* refactor: rename mock time provider

* style: fix cilppy

* chore: address comment
  • Loading branch information
evenyag authored Mar 15, 2024
1 parent a52aede commit 74862f8
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
rand.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
Expand All @@ -75,7 +76,6 @@ common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
rand.workspace = true
toml.workspace = true

[[bench]]
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl MitoEngine {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: crate::time_provider::TimeProviderRef,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;

Expand All @@ -385,6 +386,7 @@ impl MitoEngine {
object_store_manager,
write_buffer_manager,
listener,
time_provider,
)
.await?,
config,
Expand Down
103 changes: 103 additions & 0 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

//! Flush tests for mito engine.

use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_time::util::current_time_millis;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
Expand All @@ -28,6 +31,8 @@ use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};
use crate::time_provider::TimeProvider;
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;

#[tokio::test]
async fn test_manual_flush() {
Expand Down Expand Up @@ -272,3 +277,101 @@ async fn test_flush_reopen_region() {
assert_eq!(2, version_data.last_entry_id);
assert_eq!(5, version_data.committed_sequence);
}

#[derive(Debug)]
struct MockTimeProvider {
now: AtomicI64,
elapsed: AtomicI64,
}

impl TimeProvider for MockTimeProvider {
fn current_time_millis(&self) -> i64 {
self.now.load(Ordering::Relaxed)
}

fn elapsed_since(&self, _current_millis: i64) -> i64 {
self.elapsed.load(Ordering::Relaxed)
}

fn wait_duration(&self, _duration: Duration) -> Duration {
Duration::from_millis(20)
}
}

impl MockTimeProvider {
fn new(now: i64) -> Self {
Self {
now: AtomicI64::new(now),
elapsed: AtomicI64::new(0),
}
}

fn set_now(&self, now: i64) {
self.now.store(now, Ordering::Relaxed);
}

fn set_elapsed(&self, elapsed: i64) {
self.elapsed.store(elapsed, Ordering::Relaxed);
}
}

#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
let time_provider = Arc::new(MockTimeProvider::new(now));
let engine = env
.create_engine_with_time(
MitoConfig {
auto_flush_interval: Duration::from_secs(60 * 5),
..Default::default()
},
Some(write_buffer_manager.clone()),
Some(listener.clone()),
time_provider.clone(),
)
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;

// Sets current time to now + auto_flush_interval * 2.
time_provider.set_now(now + (60 * 5 * 2) * 1000);
// Sets elapsed time to MAX_INITIAL_CHECK_DELAY_SECS + 1.
time_provider.set_elapsed((MAX_INITIAL_CHECK_DELAY_SECS as i64 + 1) * 1000);

// Wait until flush is finished.
tokio::time::timeout(Duration::from_secs(3), listener.wait())
.await
.unwrap();

let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
13 changes: 8 additions & 5 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub enum FlushReason {
Manual,
/// Flush to alter table.
Alter,
/// Flush periodically.
Periodically,
}

impl FlushReason {
Expand Down Expand Up @@ -432,18 +434,19 @@ impl FlushScheduler {
) -> Result<()> {
debug_assert_eq!(region_id, task.region_id);

FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();

let version = version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
if version.memtables.is_empty() {
debug_assert!(!self.region_status.contains_key(&region_id));
// The region has nothing to flush.
task.on_success();
return Ok(());
}

// Don't increase the counter if a region has nothing to flush.
FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();

// Add this region to status map.
let flush_status = self
.region_status
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod request;
pub mod row_converter;
pub(crate) mod schedule;
pub mod sst;
mod time_provider;
pub mod wal;
mod worker;

Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};

use common_telemetry::info;
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
Expand All @@ -37,6 +36,7 @@ use crate::memtable::MemtableId;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;

/// This is the approximate factor to estimate the size of wal.
const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
Expand Down Expand Up @@ -83,6 +83,9 @@ pub(crate) struct MitoRegion {
last_flush_millis: AtomicI64,
/// Whether the region is writable.
writable: AtomicBool,

/// Provider to get current time.
time_provider: TimeProviderRef,
}

pub(crate) type MitoRegionRef = Arc<MitoRegion>;
Expand Down Expand Up @@ -119,7 +122,7 @@ impl MitoRegion {

/// Update flush time to current time.
pub(crate) fn update_flush_millis(&self) {
let now = current_time_millis();
let now = self.time_provider.current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}

Expand Down
17 changes: 14 additions & 3 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;

use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
Expand Down Expand Up @@ -47,6 +46,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::{EntryId, Wal};

/// Builder to create a new [MitoRegion] or open an existing one.
Expand All @@ -61,6 +61,7 @@ pub(crate) struct RegionOpener {
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
}

impl RegionOpener {
Expand All @@ -84,6 +85,7 @@ impl RegionOpener {
cache_manager: None,
skip_wal_replay: false,
intermediate_manager,
time_provider: None,
}
}

Expand Down Expand Up @@ -189,6 +191,9 @@ impl RegionOpener {
object_store,
self.intermediate_manager,
));
let time_provider = self
.time_provider
.unwrap_or_else(|| Arc::new(StdTimeProvider));

Ok(MitoRegion {
region_id,
Expand All @@ -201,9 +206,10 @@ impl RegionOpener {
self.cache_manager,
)),
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
time_provider,
})
}

Expand Down Expand Up @@ -307,6 +313,10 @@ impl RegionOpener {
} else {
info!("Skip the WAL replay for region: {}", region_id);
}
let time_provider = self
.time_provider
.clone()
.unwrap_or_else(|| Arc::new(StdTimeProvider));

let region = MitoRegion {
region_id: self.region_id,
Expand All @@ -315,9 +325,10 @@ impl RegionOpener {
manifest_manager,
file_purger,
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is always opened in read only mode.
writable: AtomicBool::new(false),
time_provider,
};
Ok(Some(region))
}
Expand Down
33 changes: 33 additions & 0 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::worker::WorkerGroup;

#[derive(Debug)]
Expand Down Expand Up @@ -179,6 +180,7 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
Expand Down Expand Up @@ -219,6 +221,37 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
}

/// Creates a new engine with specific config and manager/listener/time provider under this env.
pub async fn create_engine_with_time(
&mut self,
config: MitoConfig,
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
time_provider: TimeProviderRef,
) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());

let data_home = self.data_home().display().to_string();

MitoEngine::new_for_test(
&data_home,
config,
logstore,
object_store_manager,
manager,
listener,
time_provider.clone(),
)
.await
.unwrap()
Expand Down
Loading

0 comments on commit 74862f8

Please sign in to comment.