From 563f6e05e29a9f546adcf60de578915a4186a3a9 Mon Sep 17 00:00:00 2001 From: Vanish Date: Fri, 30 Jun 2023 17:42:11 +0800 Subject: [PATCH] feat: remove all the manifests in drop_region. (#1834) * feat: drop_region delete manifest file * chore: remove redundant code * chore: fmt * chore: clippy * chore: clippy * feat: support delete_all in manifest. * chore:CR * test: test_drop_basic, test_drop_reopen * chore: cr * fix: typo * chore: cr --- src/mito/src/engine.rs | 27 +-- src/mito/src/table.rs | 10 - src/mito/src/table/test_util/mock_engine.rs | 2 +- src/storage/src/engine.rs | 7 + src/storage/src/manifest/storage.rs | 43 +++++ src/storage/src/region.rs | 21 ++- src/storage/src/region/tests.rs | 2 + src/storage/src/region/tests/close.rs | 2 +- src/storage/src/region/tests/drop.rs | 192 ++++++++++++++++++++ src/storage/src/region/writer.rs | 10 +- src/store-api/src/manifest/storage.rs | 4 + 11 files changed, 288 insertions(+), 32 deletions(-) create mode 100644 src/storage/src/region/tests/drop.rs diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 1ef3a1cf5d31..dd7949692dbb 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -583,28 +583,21 @@ impl MitoEngineInner { // Remove the table from the engine to avoid further access from users. let _lock = self.table_mutex.lock(request.table_id).await; let removed_table = self.tables.remove(&request.table_id); + // Close the table to close all regions. Closing a region is idempotent. if let Some((_, table)) = &removed_table { - let regions = table.region_ids(); - let table_id = table.table_info().ident.table_id; - - table - .drop_regions(®ions) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + let mut regions = table.remove_regions(&table.region_ids()).await?; let ctx = StorageEngineContext::default(); - let opts = CloseOptions::default(); - // Releases regions in storage engine - for region_number in regions { - self.storage_engine - .close_region(&ctx, ®ion_name(table_id, region_number), &opts) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - } + let _ = futures::future::try_join_all( + regions + .drain() + .map(|(_, region)| self.storage_engine.drop_region(&ctx, region)), + ) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; Ok(true) } else { diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 12f5b437d009..bc5cc16ee737 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -534,16 +534,6 @@ impl MitoTable { Ok(removed) } - pub async fn drop_regions(&self, region_number: &[RegionNumber]) -> TableResult<()> { - let regions = self.remove_regions(region_number).await?; - - let _ = futures::future::try_join_all(regions.values().map(|region| region.drop_region())) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - Ok(()) - } - pub fn is_releasable(&self) -> bool { let regions = self.regions.load(); diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 018c9e6816ea..1dfcf9783260 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -332,7 +332,7 @@ impl StorageEngine for MockEngine { } async fn drop_region(&self, _ctx: &EngineContext, _region: Self::Region) -> Result<()> { - unimplemented!() + Ok(()) } fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result> { diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 1fa020a954ab..13a261d32e0c 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -680,7 +680,14 @@ mod tests { .unwrap(); engine.drop_region(&ctx, region).await.unwrap(); + assert!(engine.get_region(&ctx, region_name).unwrap().is_none()); + assert!(!engine + .inner + .object_store + .is_exist(dir_path.join("manifest").to_str().unwrap()) + .await + .unwrap()); } // Wait for gc diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index dcd37d9286b5..baf5a9efb8e9 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -321,6 +321,49 @@ impl ManifestLogStorage for ManifestObjectStore { Ok(ret) } + async fn delete_all(&self, remove_action_manifest: ManifestVersion) -> Result<()> { + let entries: Vec = self.get_paths(Some).await?; + + // Filter out the latest delta file. + let paths: Vec<_> = entries + .iter() + .filter(|e| { + let name = e.name(); + if is_delta_file(name) && file_version(name) == remove_action_manifest { + return false; + } + true + }) + .map(|e| e.path().to_string()) + .collect(); + + logging::info!( + "Deleting {} from manifest storage path {} paths: {:?}", + paths.len(), + self.path, + paths, + ); + + // Delete all files except the latest delta file. + self.object_store + .remove(paths) + .await + .with_context(|_| DeleteObjectSnafu { + path: self.path.clone(), + })?; + + // Delete the latest delta file and the manifest directory. + self.object_store + .remove_all(&self.path) + .await + .with_context(|_| DeleteObjectSnafu { + path: self.path.clone(), + })?; + logging::info!("Deleted manifest storage path {}", self.path); + + Ok(()) + } + async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); logging::debug!("Save log to manifest storage, version: {}", version); diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 2626ef0bfb67..5a78e82a6558 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -28,7 +28,9 @@ use common_time::util; use metrics::{decrement_gauge, increment_gauge}; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; +use store_api::manifest::{ + self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator, +}; use store_api::storage::{ AlterRequest, CloseContext, FlushContext, FlushReason, OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext, WriteResponse, @@ -127,6 +129,7 @@ impl Region for RegionImpl { } async fn drop_region(&self) -> Result<()> { + decrement_gauge!(crate::metrics::REGION_COUNT, 1.0); self.inner.drop_region().await } @@ -481,6 +484,22 @@ impl RegionImpl { .insert(c.committed_sequence, (manifest_version, c.metadata)); version = Some(v); } + (RegionMetaAction::Remove(r), Some(v)) => { + manifest.stop().await?; + + let files = v.ssts().mark_all_files_deleted(); + logging::info!( + "Try to remove all SSTs, region: {}, files: {:?}", + r.region_id, + files + ); + + manifest + .manifest_store() + .delete_all(v.manifest_version()) + .await?; + return Ok((None, recovered_metadata)); + } (action, None) => { actions.push((manifest_version, action)); version = None; diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 17a812473766..f31e13339b31 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -60,12 +60,14 @@ mod alter; mod basic; mod close; mod compact; +mod drop; mod flush; mod projection; /// Create metadata of a region with schema: (timestamp, v0). pub fn new_metadata(region_name: &str) -> RegionMetadata { let desc = RegionDescBuilder::new(region_name) + .id(123) .push_field_column(("v0", LogicalTypeId::String, true)) .build(); desc.try_into().unwrap() diff --git a/src/storage/src/region/tests/close.rs b/src/storage/src/region/tests/close.rs index 44d54ac5bd28..75f7ab032db4 100644 --- a/src/storage/src/region/tests/close.rs +++ b/src/storage/src/region/tests/close.rs @@ -38,7 +38,7 @@ struct CloseTester { base: Option, } -/// Create a new region for flush test +/// Create a new region for close test async fn create_region_for_close( store_dir: &str, flush_strategy: FlushStrategyRef, diff --git a/src/storage/src/region/tests/drop.rs b/src/storage/src/region/tests/drop.rs new file mode 100644 index 000000000000..8fc7b8550f49 --- /dev/null +++ b/src/storage/src/region/tests/drop.rs @@ -0,0 +1,192 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region drop tests. + +use std::path::Path; +use std::sync::Arc; + +use common_telemetry::info; +use common_test_util::temp_dir::create_temp_dir; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use store_api::manifest::{Manifest, MetaAction}; +use store_api::storage::{FlushContext, OpenOptions, Region}; + +use crate::config::EngineConfig; +use crate::engine; +use crate::flush::FlushStrategyRef; +use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionRemove}; +use crate::region::tests::{self, FileTesterBase}; +use crate::region::RegionImpl; +use crate::test_util::config_util; +use crate::test_util::flush_switch::{has_parquet_file, FlushSwitch}; + +const REGION_NAME: &str = "region-drop-0"; + +/// Create a new region for drop tests. +async fn create_region_for_drop( + store_dir: &str, + flush_strategy: FlushStrategyRef, +) -> RegionImpl { + let metadata = tests::new_metadata(REGION_NAME); + + let mut store_config = + config_util::new_store_config(REGION_NAME, store_dir, EngineConfig::default()).await; + store_config.flush_strategy = flush_strategy; + + RegionImpl::create(metadata, store_config).await.unwrap() +} + +/// Tester for drop tests. +struct DropTester { + base: Option, +} + +impl DropTester { + async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> DropTester { + let region = create_region_for_drop(store_dir, flush_strategy).await; + DropTester { + base: Some(FileTesterBase::with_region(region)), + } + } + + #[inline] + fn base(&self) -> &FileTesterBase { + self.base.as_ref().unwrap() + } + + async fn put(&self, data: &[(i64, Option)]) { + let data = data + .iter() + .map(|(ts, v0)| (*ts, v0.map(|v| v.to_string()))) + .collect::>(); + let _ = self.base().put(&data).await; + } + + async fn flush(&self) { + let ctx = FlushContext::default(); + self.base().region.flush(&ctx).await.unwrap(); + } + + async fn close(&mut self) { + if let Some(base) = self.base.take() { + base.close().await; + } + } +} + +fn get_all_files(path: &str) -> Vec { + let mut files = Vec::new(); + for entry in std::fs::read_dir(path).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() { + files.push(path.to_str().unwrap().to_string()); + } else if path.is_dir() { + files.extend(get_all_files(path.to_str().unwrap())); + } + } + files +} + +#[tokio::test] +async fn test_drop_basic() { + let dir = create_temp_dir("drop-basic"); + common_telemetry::init_default_ut_logging(); + let store_dir = dir.path().to_str().unwrap(); + + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + let manifest_dir = format!( + "{}/{}", + store_dir, + engine::region_manifest_dir("", REGION_NAME) + ); + let flush_switch = Arc::new(FlushSwitch::default()); + let mut tester = DropTester::new(store_dir, flush_switch.clone()).await; + + let data = [(1000, Some(100))]; + + // Put one element so we have content to flush. + tester.put(&data).await; + + // Manually trigger flush. + tester.flush().await; + + assert!(has_parquet_file(&sst_dir)); + + tester.base().checkpoint_manifest().await; + let manifest_files = get_all_files(&manifest_dir); + info!("manifest_files: {:?}", manifest_files); + + tester.base().region.drop_region().await.unwrap(); + tester.close().await; + + assert!(!Path::new(&manifest_dir).exists()); +} + +#[tokio::test] +async fn test_drop_reopen() { + let dir = create_temp_dir("drop-basic"); + common_telemetry::init_default_ut_logging(); + let store_dir = dir.path().to_str().unwrap(); + + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + let manifest_dir = format!( + "{}/{}", + store_dir, + engine::region_manifest_dir("", REGION_NAME) + ); + let flush_switch = Arc::new(FlushSwitch::default()); + let mut tester = DropTester::new(store_dir, flush_switch.clone()).await; + + let data = [(1000, Some(100))]; + + // Put one element so we have content to flush. + tester.put(&data).await; + // Manually trigger flush. + tester.flush().await; + + assert!(has_parquet_file(&sst_dir)); + + tester.base().checkpoint_manifest().await; + let version_control = tester.base().region.version_control(); + + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Remove(RegionRemove { + region_id: tester.base().region.id(), + })); + let prev_version = version_control.current_manifest_version(); + action_list.set_prev_version(prev_version); + let manifest = &tester.base().region.inner.manifest; + let _ = manifest.update(action_list).await.unwrap(); + tester.close().await; + + // Reopen the region. + let store_config = config_util::new_store_config( + REGION_NAME, + store_dir, + EngineConfig { + max_files_in_l0: usize::MAX, + ..Default::default() + }, + ) + .await; + + let opts = OpenOptions::default(); + let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts) + .await + .unwrap(); + assert!(region.is_none()); + assert!(!Path::new(&manifest_dir).exists()); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 31f08d6acc1b..1770d8d79a2e 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -21,7 +21,7 @@ use futures::TryStreamExt; use metrics::increment_counter; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; -use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; +use store_api::manifest::{Manifest, ManifestLogStorage, ManifestVersion, MetaAction}; use store_api::storage::{ AlterRequest, FlushContext, FlushReason, SequenceNumber, WriteContext, WriteResponse, }; @@ -289,6 +289,7 @@ impl RegionWriter { // 5. Mark all data obsolete in the WAL. // 6. Delete the namespace of the region from the WAL. // 7. Mark all SSTs deleted. + // 8. Remove all manifests. let mut inner = self.inner.lock().await; inner.mark_closed(); @@ -317,7 +318,7 @@ impl RegionWriter { action_list ); - let _ = drop_ctx.manifest.update(action_list).await?; + let remove_action_version = drop_ctx.manifest.update(action_list).await?; // Mark all data obsolete and delete the namespace in the WAL drop_ctx.wal.obsolete(committed_sequence).await?; @@ -336,6 +337,11 @@ impl RegionWriter { files ); + drop_ctx + .manifest + .manifest_store() + .delete_all(remove_action_version) + .await?; Ok(()) } diff --git a/src/store-api/src/manifest/storage.rs b/src/store-api/src/manifest/storage.rs index 44b54db81aab..3de1d531995c 100644 --- a/src/store-api/src/manifest/storage.rs +++ b/src/store-api/src/manifest/storage.rs @@ -46,6 +46,10 @@ pub trait ManifestLogStorage { keep_last_checkpoint: bool, ) -> Result; + /// Delete all logs and checkpoints, and remove the manifest directory. + /// The delta file corresponding to the `remove_action_version` will be deleted along with the manifest directory at the end. + async fn delete_all(&self, remove_action_version: ManifestVersion) -> Result<(), Self::Error>; + /// Save a log async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<(), Self::Error>;