Skip to content

Commit

Permalink
feat: remove all the manifests in drop_region. (#1834)
Browse files Browse the repository at this point in the history
* feat: drop_region delete manifest file

* chore: remove redundant code

* chore: fmt

* chore: clippy

* chore: clippy

* feat: support delete_all in manifest.

* chore:CR

* test: test_drop_basic, test_drop_reopen

* chore: cr

* fix: typo

* chore: cr
  • Loading branch information
DevilExileSu committed Jun 30, 2023
1 parent 25cb667 commit 563f6e0
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 32 deletions.
27 changes: 10 additions & 17 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,28 +583,21 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Remove the table from the engine to avoid further access from users.
let _lock = self.table_mutex.lock(request.table_id).await;
let removed_table = self.tables.remove(&request.table_id);

// Close the table to close all regions. Closing a region is idempotent.
if let Some((_, table)) = &removed_table {
let regions = table.region_ids();
let table_id = table.table_info().ident.table_id;

table
.drop_regions(&regions)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let mut regions = table.remove_regions(&table.region_ids()).await?;

let ctx = StorageEngineContext::default();

let opts = CloseOptions::default();
// Releases regions in storage engine
for region_number in regions {
self.storage_engine
.close_region(&ctx, &region_name(table_id, region_number), &opts)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
let _ = futures::future::try_join_all(
regions
.drain()
.map(|(_, region)| self.storage_engine.drop_region(&ctx, region)),
)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;

Ok(true)
} else {
Expand Down
10 changes: 0 additions & 10 deletions src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,16 +534,6 @@ impl<R: Region> MitoTable<R> {
Ok(removed)
}

pub async fn drop_regions(&self, region_number: &[RegionNumber]) -> TableResult<()> {
let regions = self.remove_regions(region_number).await?;

let _ = futures::future::try_join_all(regions.values().map(|region| region.drop_region()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}

pub fn is_releasable(&self) -> bool {
let regions = self.regions.load();

Expand Down
2 changes: 1 addition & 1 deletion src/mito/src/table/test_util/mock_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl StorageEngine for MockEngine {
}

async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> {
unimplemented!()
Ok(())
}

fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<MockRegion>> {
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,14 @@ mod tests {
.unwrap();

engine.drop_region(&ctx, region).await.unwrap();

assert!(engine.get_region(&ctx, region_name).unwrap().is_none());
assert!(!engine
.inner
.object_store
.is_exist(dir_path.join("manifest").to_str().unwrap())
.await
.unwrap());
}

// Wait for gc
Expand Down
43 changes: 43 additions & 0 deletions src/storage/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,49 @@ impl ManifestLogStorage for ManifestObjectStore {
Ok(ret)
}

async fn delete_all(&self, remove_action_manifest: ManifestVersion) -> Result<()> {
let entries: Vec<Entry> = self.get_paths(Some).await?;

// Filter out the latest delta file.
let paths: Vec<_> = entries
.iter()
.filter(|e| {
let name = e.name();
if is_delta_file(name) && file_version(name) == remove_action_manifest {
return false;
}
true
})
.map(|e| e.path().to_string())
.collect();

logging::info!(
"Deleting {} from manifest storage path {} paths: {:?}",
paths.len(),
self.path,
paths,
);

// Delete all files except the latest delta file.
self.object_store
.remove(paths)
.await
.with_context(|_| DeleteObjectSnafu {
path: self.path.clone(),
})?;

// Delete the latest delta file and the manifest directory.
self.object_store
.remove_all(&self.path)
.await
.with_context(|_| DeleteObjectSnafu {
path: self.path.clone(),
})?;
logging::info!("Deleted manifest storage path {}", self.path);

Ok(())
}

async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
logging::debug!("Save log to manifest storage, version: {}", version);
Expand Down
21 changes: 20 additions & 1 deletion src/storage/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use common_time::util;
use metrics::{decrement_gauge, increment_gauge};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::manifest::{
self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator,
};
use store_api::storage::{
AlterRequest, CloseContext, FlushContext, FlushReason, OpenOptions, ReadContext, Region,
RegionId, SequenceNumber, WriteContext, WriteResponse,
Expand Down Expand Up @@ -127,6 +129,7 @@ impl<S: LogStore> Region for RegionImpl<S> {
}

async fn drop_region(&self) -> Result<()> {
decrement_gauge!(crate::metrics::REGION_COUNT, 1.0);
self.inner.drop_region().await
}

Expand Down Expand Up @@ -481,6 +484,22 @@ impl<S: LogStore> RegionImpl<S> {
.insert(c.committed_sequence, (manifest_version, c.metadata));
version = Some(v);
}
(RegionMetaAction::Remove(r), Some(v)) => {
manifest.stop().await?;

let files = v.ssts().mark_all_files_deleted();
logging::info!(
"Try to remove all SSTs, region: {}, files: {:?}",
r.region_id,
files
);

manifest
.manifest_store()
.delete_all(v.manifest_version())
.await?;
return Ok((None, recovered_metadata));
}
(action, None) => {
actions.push((manifest_version, action));
version = None;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/region/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ mod alter;
mod basic;
mod close;
mod compact;
mod drop;
mod flush;
mod projection;

/// Create metadata of a region with schema: (timestamp, v0).
pub fn new_metadata(region_name: &str) -> RegionMetadata {
let desc = RegionDescBuilder::new(region_name)
.id(123)
.push_field_column(("v0", LogicalTypeId::String, true))
.build();
desc.try_into().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/region/tests/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct CloseTester {
base: Option<FileTesterBase>,
}

/// Create a new region for flush test
/// Create a new region for close test
async fn create_region_for_close(
store_dir: &str,
flush_strategy: FlushStrategyRef,
Expand Down
192 changes: 192 additions & 0 deletions src/storage/src/region/tests/drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Region drop tests.

use std::path::Path;
use std::sync::Arc;

use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::manifest::{Manifest, MetaAction};
use store_api::storage::{FlushContext, OpenOptions, Region};

use crate::config::EngineConfig;
use crate::engine;
use crate::flush::FlushStrategyRef;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionRemove};
use crate::region::tests::{self, FileTesterBase};
use crate::region::RegionImpl;
use crate::test_util::config_util;
use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch};

const REGION_NAME: &str = "region-drop-0";

/// Create a new region for drop tests.
async fn create_region_for_drop(
store_dir: &str,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
let metadata = tests::new_metadata(REGION_NAME);

let mut store_config =
config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await;
store_config.flush_strategy = flush_strategy;

RegionImpl::create(metadata, store_config).await.unwrap()
}

/// Tester for drop tests.
struct DropTester {
base: Option<FileTesterBase>,
}

impl DropTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> DropTester {
let region = create_region_for_drop(store_dir, flush_strategy).await;
DropTester {
base: Some(FileTesterBase::with_region(region)),
}
}

#[inline]
fn base(&self) -> &FileTesterBase {
self.base.as_ref().unwrap()
}

async fn put(&self, data: &[(i64, Option<i64>)]) {
let data = data
.iter()
.map(|(ts, v0)| (*ts, v0.map(|v| v.to_string())))
.collect::<Vec<_>>();
let _ = self.base().put(&data).await;
}

async fn flush(&self) {
let ctx = FlushContext::default();
self.base().region.flush(&ctx).await.unwrap();
}

async fn close(&mut self) {
if let Some(base) = self.base.take() {
base.close().await;
}
}
}

fn get_all_files(path: &str) -> Vec<String> {
let mut files = Vec::new();
for entry in std::fs::read_dir(path).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if path.is_file() {
files.push(path.to_str().unwrap().to_string());
} else if path.is_dir() {
files.extend(get_all_files(path.to_str().unwrap()));
}
}
files
}

#[tokio::test]
async fn test_drop_basic() {
let dir = create_temp_dir("drop-basic");
common_telemetry::init_default_ut_logging();
let store_dir = dir.path().to_str().unwrap();

let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
let manifest_dir = format!(
"{}/{}",
store_dir,
engine::region_manifest_dir("", REGION_NAME)
);
let flush_switch = Arc::new(FlushSwitch::default());
let mut tester = DropTester::new(store_dir, flush_switch.clone()).await;

let data = [(1000, Some(100))];

// Put one element so we have content to flush.
tester.put(&data).await;

// Manually trigger flush.
tester.flush().await;

assert!(has_parquet_file(&sst_dir));

tester.base().checkpoint_manifest().await;
let manifest_files = get_all_files(&manifest_dir);
info!("manifest_files: {:?}", manifest_files);

tester.base().region.drop_region().await.unwrap();
tester.close().await;

assert!(!Path::new(&manifest_dir).exists());
}

#[tokio::test]
async fn test_drop_reopen() {
let dir = create_temp_dir("drop-basic");
common_telemetry::init_default_ut_logging();
let store_dir = dir.path().to_str().unwrap();

let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
let manifest_dir = format!(
"{}/{}",
store_dir,
engine::region_manifest_dir("", REGION_NAME)
);
let flush_switch = Arc::new(FlushSwitch::default());
let mut tester = DropTester::new(store_dir, flush_switch.clone()).await;

let data = [(1000, Some(100))];

// Put one element so we have content to flush.
tester.put(&data).await;
// Manually trigger flush.
tester.flush().await;

assert!(has_parquet_file(&sst_dir));

tester.base().checkpoint_manifest().await;
let version_control = tester.base().region.version_control();

let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Remove(RegionRemove {
region_id: tester.base().region.id(),
}));
let prev_version = version_control.current_manifest_version();
action_list.set_prev_version(prev_version);
let manifest = &tester.base().region.inner.manifest;
let _ = manifest.update(action_list).await.unwrap();
tester.close().await;

// Reopen the region.
let store_config = config_util::new_store_config(
REGION_NAME,
store_dir,
EngineConfig {
max_files_in_l0: usize::MAX,
..Default::default()
},
)
.await;

let opts = OpenOptions::default();
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
.await
.unwrap();
assert!(region.is_none());
assert!(!Path::new(&manifest_dir).exists());
}
Loading

0 comments on commit 563f6e0

Please sign in to comment.