Skip to content

Commit

Permalink
chore: add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 25, 2024
1 parent fa57975 commit 608751f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
7 changes: 6 additions & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,17 @@ impl RegionFlushTask {
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We will leak files if the manifest update fails, but we ignore them for simplicity. We can
// add a cleanup job to remove them later.
self.manifest_ctx
let version = self
.manifest_ctx
.update_manifest(
smallvec![RegionLeaderState::Writable, RegionLeaderState::Downgrading],
action_list,
)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}",
self.region_id
);

Ok(edit)
}
Expand Down
15 changes: 12 additions & 3 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crossbeam_utils::atomic::AtomicCell;
use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt};
use store_api::logstore::provider::Provider;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionRole, SettableRegionRoleState};
use store_api::storage::RegionId;
Expand Down Expand Up @@ -373,6 +374,14 @@ impl ManifestContext {
}
}

pub(crate) async fn manifest_version(&self) -> ManifestVersion {
self.manifest_manager
.read()
.await
.manifest()
.manifest_version
}

pub(crate) async fn has_update(&self) -> Result<bool> {
self.manifest_manager.read().await.has_update().await
}
Expand All @@ -382,7 +391,7 @@ impl ManifestContext {
&self,
expect_states: SmallVec<[RegionLeaderState; 2]>,
action_list: RegionMetaActionList,
) -> Result<()> {
) -> Result<ManifestVersion> {
// Acquires the write lock of the manifest manager.
let mut manager = self.manifest_manager.write().await;
// Gets current manifest.
Expand Down Expand Up @@ -438,7 +447,7 @@ impl ManifestContext {
}

// Now we can update the manifest.
manager.update(action_list).await.inspect_err(
let version = manager.update(action_list).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;

Expand All @@ -449,7 +458,7 @@ impl ManifestContext {
);
}

Ok(())
Ok(version)
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,10 @@ impl RegionOpener {
let version_control = Arc::new(VersionControl::new(version));
if !self.skip_wal_replay {
info!(
"Start replaying memtable at flushed_entry_id + 1 {} for region {}",
"Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
flushed_entry_id + 1,
region_id
region_id,
manifest.manifest_version
);
replay_memtable(
&provider,
Expand All @@ -380,7 +381,10 @@ impl RegionOpener {
)
.await?;
} else {
info!("Skip the WAL replay for region: {}", region_id);
info!(
"Skip the WAL replay for region: {}, manifest version: {}",
region_id, manifest.manifest_version
);
}
let now = self.time_provider.current_time_millis();

Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/worker/handle_catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {

// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}");
let manifest_version = region.manifest_ctx.manifest_version().await;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/worker/handle_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ impl<S> RegionWorkerLoop<S> {

let result = manifest_ctx
.update_manifest(smallvec![RegionLeaderState::Truncating], action_list)
.await;
.await
.map(|_| ());

// Sends the result back to the request sender.
let truncate_result = TruncateResult {
Expand Down Expand Up @@ -243,7 +244,8 @@ impl<S> RegionWorkerLoop<S> {
let result = region
.manifest_ctx
.update_manifest(smallvec![RegionLeaderState::Altering], action_list)
.await;
.await
.map(|_| ());
let notify = WorkerRequest::Background {
region_id: region.region_id,
notify: BackgroundNotify::RegionChange(RegionChangeResult {
Expand Down Expand Up @@ -341,4 +343,5 @@ async fn edit_region(
.manifest_ctx
.update_manifest(smallvec![RegionLeaderState::Editing], action_list)
.await
.map(|_| ())
}

0 comments on commit 608751f

Please sign in to comment.