From fb8428032bf51f1f67c1e178e13cc1e9affd5b5d Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 24 Oct 2024 00:08:07 +0800 Subject: [PATCH 1/2] fix(storage): fix refresh virtual column using async api (#16656) * fix(storage): fix refresh virtual column using async api * fix typos * fix --------- Co-authored-by: sundyli <543950155@qq.com> --- Cargo.lock | 6 + src/common/metrics/src/metrics/storage.rs | 22 ++ src/query/ee/Cargo.toml | 5 + .../fuse/operations/virtual_columns.rs | 306 +++++++++++++++--- .../virtual_column/virtual_column_handler.rs | 6 +- .../fuse/operations/virtual_columns.rs | 36 ++- .../ee_features/virtual_column/Cargo.toml | 1 + .../virtual_column/src/virtual_column.rs | 9 +- .../src/interpreters/hook/refresh_hook.rs | 26 +- .../interpreter_virtual_column_refresh.rs | 17 +- .../read/block/block_reader_merge_io_async.rs | 14 - .../read/block/block_reader_merge_io_sync.rs | 10 - .../src/io/read/block/block_reader_native.rs | 18 +- 13 files changed, 368 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8329c8299f94..a123628c3cec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4942,6 +4942,11 @@ dependencies = [ "databend-common-meta-app", "databend-common-meta-store", "databend-common-meta-types", + "databend-common-metrics", + "databend-common-pipeline-core", + "databend-common-pipeline-sinks", + "databend-common-pipeline-sources", + "databend-common-pipeline-transforms", "databend-common-sql", "databend-common-storage", "databend-common-storages-fuse", @@ -5030,6 +5035,7 @@ dependencies = [ "databend-common-catalog", "databend-common-exception", "databend-common-meta-app", + "databend-common-pipeline-core", "databend-common-storages-fuse", "databend-storages-common-table-meta", ] diff --git a/src/common/metrics/src/metrics/storage.rs b/src/common/metrics/src/metrics/storage.rs index ba9dc57870b4..e65224cfba97 100644 --- a/src/common/metrics/src/metrics/storage.rs +++ b/src/common/metrics/src/metrics/storage.rs @@ -288,6 +288,15 @@ static AGG_INDEX_WRITE_BYTES: LazyLock = static AGG_INDEX_WRITE_MILLISECONDS: LazyLock = LazyLock::new(|| register_histogram_in_milliseconds("fuse_aggregate_index_write_milliseconds")); +// Virtual column metrics. +static BLOCK_VIRTUAL_COLUMN_WRITE_NUMS: LazyLock = + LazyLock::new(|| register_counter("fuse_block_virtual_column_write_nums")); +static BLOCK_VIRTUAL_COLUMN_WRITE_BYTES: LazyLock = + LazyLock::new(|| register_counter("fuse_block_virtual_column_write_bytes")); +static BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS: LazyLock = LazyLock::new(|| { + register_histogram_in_milliseconds("fuse_block_virtual_column_write_milliseconds") +}); + /// Common metrics. pub fn metrics_inc_omit_filter_rowgroups(c: u64) { OMIT_FILTER_ROWGROUPS.inc_by(c); @@ -790,3 +799,16 @@ pub fn metrics_inc_agg_index_write_bytes(c: u64) { pub fn metrics_inc_agg_index_write_milliseconds(c: u64) { AGG_INDEX_WRITE_MILLISECONDS.observe(c as f64); } + +/// Virtual column metrics. +pub fn metrics_inc_block_virtual_column_write_nums(c: u64) { + BLOCK_VIRTUAL_COLUMN_WRITE_NUMS.inc_by(c); +} + +pub fn metrics_inc_block_virtual_column_write_bytes(c: u64) { + BLOCK_VIRTUAL_COLUMN_WRITE_BYTES.inc_by(c); +} + +pub fn metrics_inc_block_virtual_column_write_milliseconds(c: u64) { + BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS.observe(c as f64); +} diff --git a/src/query/ee/Cargo.toml b/src/query/ee/Cargo.toml index 1c3835e52b44..a696dd2c9aee 100644 --- a/src/query/ee/Cargo.toml +++ b/src/query/ee/Cargo.toml @@ -31,6 +31,11 @@ databend-common-meta-api = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } +databend-common-metrics = { workspace = true } +databend-common-pipeline-core = { workspace = true } +databend-common-pipeline-sinks = { workspace = true } +databend-common-pipeline-sources = { workspace = true } +databend-common-pipeline-transforms = { workspace = true } databend-common-sql = { workspace = true } databend-common-storage = { workspace = true } databend-common-storages-fuse = { workspace = true } diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index 8946bd4f2dc3..ca19a34791b4 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -12,26 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Instant; +use async_trait::async_trait; use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::infer_schema_type; use databend_common_expression::BlockEntry; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; -use databend_common_expression::DataSchemaRef; use databend_common_expression::Evaluator; +use databend_common_expression::Expr; use databend_common_expression::TableDataType; use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; +use databend_common_metrics::storage::metrics_inc_block_virtual_column_write_bytes; +use databend_common_metrics::storage::metrics_inc_block_virtual_column_write_milliseconds; +use databend_common_metrics::storage::metrics_inc_block_virtual_column_write_nums; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use databend_common_pipeline_transforms::processors::AsyncTransform; +use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::parse_computed_expr; +use databend_common_storage::read_parquet_schema_async_rs; use databend_common_storages_fuse::io::serialize_block; use databend_common_storages_fuse::io::write_data; +use databend_common_storages_fuse::io::BlockReader; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::io::WriteSettings; @@ -39,17 +60,33 @@ use databend_common_storages_fuse::FuseStorageFormat; use databend_common_storages_fuse::FuseTable; use databend_storages_common_cache::LoadParams; use databend_storages_common_io::ReadSettings; +use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::Location; use opendal::Operator; +// The big picture of refresh virtual column into pipeline: +// +// ┌─────────────────────────┐ +// ┌────> │ VirtualColumnTransform1 │ ────┐ +// │ └─────────────────────────┘ │ +// │ ... │ +// ┌─────────────────────┐ │ ┌─────────────────────────┐ │ ┌───────────────────┐ +// │ VirtualColumnSource │ ────┼────> │ VirtualColumnTransformN │ ────┼────> │ VirtualColumnSink │ +// └─────────────────────┘ │ └─────────────────────────┘ │ └───────────────────┘ +// │ ... │ +// │ ┌─────────────────────────┐ │ +// └────> │ VirtualColumnTransformZ │ ────┘ +// └─────────────────────────┘ +// #[async_backtrace::framed] pub async fn do_refresh_virtual_column( - fuse_table: &FuseTable, ctx: Arc, - virtual_exprs: Vec, + fuse_table: &FuseTable, + virtual_columns: Vec, segment_locs: Option>, + pipeline: &mut Pipeline, ) -> Result<()> { - if virtual_exprs.is_empty() { + if virtual_columns.is_empty() { return Ok(()); } @@ -69,7 +106,7 @@ pub async fn do_refresh_virtual_column( if f.data_type().remove_nullable() != TableDataType::Variant { continue; } - let is_src_field = virtual_exprs.iter().any(|v| v.starts_with(f.name())); + let is_src_field = virtual_columns.iter().any(|v| v.starts_with(f.name())); if is_src_field { field_indices.push(i); } @@ -89,7 +126,6 @@ pub async fn do_refresh_virtual_column( let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone()); - let settings = ReadSettings::from_ctx(&ctx)?; let write_settings = fuse_table.get_write_settings(); let storage_format = write_settings.storage_format; @@ -103,6 +139,7 @@ pub async fn do_refresh_virtual_column( }; // Read source variant columns and extract inner fields as virtual columns. + let mut block_metas = VecDeque::new(); for (location, ver) in segment_locs { let segment_info = segment_reader .read(&LoadParams { @@ -113,22 +150,24 @@ pub async fn do_refresh_virtual_column( }) .await?; - let block_metas = segment_info.block_metas()?; - for block_meta in block_metas { - let block = block_reader - .read_by_meta(&settings, &block_meta, &storage_format) - .await?; + for block_meta in segment_info.block_metas()? { let virtual_loc = TableMetaLocationGenerator::gen_virtual_block_location(&block_meta.location.0); let schema = match storage_format { - FuseStorageFormat::Parquet => block_reader.sync_read_schema(&virtual_loc), - FuseStorageFormat::Native => block_reader.sync_read_native_schema(&virtual_loc), + FuseStorageFormat::Parquet => { + read_parquet_schema_async_rs(operator, &virtual_loc, None) + .await + .ok() + } + FuseStorageFormat::Native => { + block_reader.async_read_native_schema(&virtual_loc).await + } }; // if all virtual columns has be generated, we can ignore this block let all_generated = if let Some(schema) = schema { - virtual_exprs + virtual_columns .iter() .all(|virtual_name| schema.fields.iter().any(|f| f.name() == virtual_name)) } else { @@ -138,54 +177,217 @@ pub async fn do_refresh_virtual_column( continue; } - materialize_virtual_columns( - ctx.clone(), - operator, - &write_settings, - &virtual_loc, - source_schema.clone(), - &virtual_exprs, - block, - ) - .await?; + block_metas.push_back(block_meta); } } + if block_metas.is_empty() { + return Ok(()); + } + + let settings = ReadSettings::from_ctx(&ctx)?; + pipeline.add_source( + |output| { + let inner = VirtualColumnSource::new( + settings, + storage_format, + block_reader.clone(), + block_metas.clone(), + ); + AsyncSourcer::create(ctx.clone(), output, inner) + }, + 1, + )?; + + let mut virtual_fields = Vec::with_capacity(virtual_columns.len()); + let mut virtual_exprs = Vec::with_capacity(virtual_columns.len()); + for virtual_column in virtual_columns { + let virtual_expr = + parse_computed_expr(ctx.clone(), source_schema.clone(), &virtual_column)?; + let virtual_field = TableField::new( + &virtual_column, + infer_schema_type(virtual_expr.data_type())?, + ); + virtual_exprs.push(virtual_expr); + virtual_fields.push(virtual_field); + } + let virtual_schema = TableSchemaRefExt::create(virtual_fields); + + let block_nums = block_metas.len(); + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let max_threads = std::cmp::min(block_nums, max_threads); + pipeline.try_resize(max_threads)?; + pipeline.add_async_transformer(|| { + VirtualColumnTransform::new( + ctx.clone(), + write_settings.clone(), + virtual_exprs.clone(), + virtual_schema.clone(), + operator.clone(), + ) + }); + + pipeline.try_resize(1)?; + pipeline.add_sink(|input| VirtualColumnSink::try_create(input, block_nums))?; + Ok(()) } -#[async_backtrace::framed] -async fn materialize_virtual_columns( +/// `VirtualColumnSource` is used to read data blocks that need generate virtual columns. +pub struct VirtualColumnSource { + settings: ReadSettings, + storage_format: FuseStorageFormat, + block_reader: Arc, + block_metas: VecDeque>, + is_finished: bool, +} + +impl VirtualColumnSource { + pub fn new( + settings: ReadSettings, + storage_format: FuseStorageFormat, + block_reader: Arc, + block_metas: VecDeque>, + ) -> Self { + Self { + settings, + storage_format, + block_reader, + block_metas, + is_finished: false, + } + } +} + +#[async_trait::async_trait] +impl AsyncSource for VirtualColumnSource { + const NAME: &'static str = "VirtualColumnSource"; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + + match self.block_metas.pop_front() { + Some(block_meta) => { + let block = self + .block_reader + .read_by_meta(&self.settings, &block_meta, &self.storage_format) + .await?; + let block = block.add_meta(Some(Box::new(Arc::unwrap_or_clone(block_meta))))?; + Ok(Some(block)) + } + None => { + self.is_finished = true; + Ok(None) + } + } + } +} + +/// `VirtualColumnTransform` is used to generate virtual columns for each blocks. +pub struct VirtualColumnTransform { ctx: Arc, - operator: &Operator, - write_settings: &WriteSettings, - location: &str, - source_schema: DataSchemaRef, - virtual_exprs: &Vec, - block: DataBlock, -) -> Result<()> { - let len = block.num_rows(); - - let func_ctx = ctx.get_function_context()?; - let evaluator = Evaluator::new(&block, &func_ctx, &BUILTIN_FUNCTIONS); - let mut virtual_fields = Vec::with_capacity(virtual_exprs.len()); - let mut virtual_columns = Vec::with_capacity(virtual_exprs.len()); - for virtual_expr in virtual_exprs { - let expr = parse_computed_expr(ctx.clone(), source_schema.clone(), virtual_expr)?; - let virtual_field = TableField::new(virtual_expr, infer_schema_type(expr.data_type())?); - virtual_fields.push(virtual_field); + write_settings: WriteSettings, + virtual_exprs: Vec, + virtual_schema: TableSchemaRef, + operator: Operator, +} - let value = evaluator.run(&expr)?; - let virtual_column = BlockEntry::new(expr.data_type().clone(), value); - virtual_columns.push(virtual_column); +impl VirtualColumnTransform { + pub fn new( + ctx: Arc, + write_settings: WriteSettings, + virtual_exprs: Vec, + virtual_schema: TableSchemaRef, + operator: Operator, + ) -> Self { + Self { + ctx, + write_settings, + virtual_exprs, + virtual_schema, + operator, + } } - let virtual_schema = TableSchemaRefExt::create(virtual_fields); - let virtual_block = DataBlock::new(virtual_columns, len); +} - let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let _ = serialize_block(write_settings, &virtual_schema, virtual_block, &mut buffer)?; +#[async_trait::async_trait] +impl AsyncTransform for VirtualColumnTransform { + const NAME: &'static str = "VirtualColumnTransform"; - write_data(buffer, operator, location).await?; + #[async_backtrace::framed] + async fn transform(&mut self, data_block: DataBlock) -> Result { + let block_meta = data_block + .get_meta() + .and_then(BlockMeta::downcast_ref_from) + .unwrap(); - Ok(()) + let virtual_location = + TableMetaLocationGenerator::gen_virtual_block_location(&block_meta.location.0); + + let start = Instant::now(); + + let len = data_block.num_rows(); + let func_ctx = self.ctx.get_function_context()?; + let evaluator = Evaluator::new(&data_block, &func_ctx, &BUILTIN_FUNCTIONS); + let mut virtual_entries = Vec::with_capacity(self.virtual_exprs.len()); + for virtual_expr in &self.virtual_exprs { + let value = evaluator.run(virtual_expr)?; + let virtual_entry = BlockEntry::new(virtual_expr.data_type().clone(), value); + virtual_entries.push(virtual_entry); + } + let virtual_block = DataBlock::new(virtual_entries, len); + + let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); + let _ = serialize_block( + &self.write_settings, + &self.virtual_schema, + virtual_block, + &mut buffer, + )?; + + let virtual_column_size = buffer.len() as u64; + write_data(buffer, &self.operator, &virtual_location).await?; + + // Perf. + { + metrics_inc_block_virtual_column_write_nums(1); + metrics_inc_block_virtual_column_write_bytes(virtual_column_size); + metrics_inc_block_virtual_column_write_milliseconds(start.elapsed().as_millis() as u64); + } + + let new_block = DataBlock::new(vec![], 0); + Ok(new_block) + } +} + +/// `VirtualColumnSink` is used to finish build virtual column pipeline. +pub struct VirtualColumnSink { + block_nums: AtomicUsize, +} + +impl VirtualColumnSink { + pub fn try_create(input: Arc, block_nums: usize) -> Result { + let sinker = AsyncSinker::create(input, VirtualColumnSink { + block_nums: AtomicUsize::new(block_nums), + }); + Ok(ProcessorPtr::create(sinker)) + } +} + +#[async_trait] +impl AsyncSink for VirtualColumnSink { + const NAME: &'static str = "VirtualColumnSink"; + + #[async_backtrace::framed] + async fn consume(&mut self, _data_block: DataBlock) -> Result { + let num = self.block_nums.fetch_sub(1, Ordering::SeqCst); + if num > 1 { + return Ok(false); + } + + Ok(true) + } } diff --git a/src/query/ee/src/virtual_column/virtual_column_handler.rs b/src/query/ee/src/virtual_column/virtual_column_handler.rs index e0270fc2edcd..c400ed106f34 100644 --- a/src/query/ee/src/virtual_column/virtual_column_handler.rs +++ b/src/query/ee/src/virtual_column/virtual_column_handler.rs @@ -23,6 +23,7 @@ use databend_common_meta_app::schema::DropVirtualColumnReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; use databend_common_meta_app::schema::UpdateVirtualColumnReq; use databend_common_meta_app::schema::VirtualColumnMeta; +use databend_common_pipeline_core::Pipeline; use databend_common_storages_fuse::FuseTable; use databend_enterprise_virtual_column::VirtualColumnHandler; use databend_enterprise_virtual_column::VirtualColumnHandlerWrapper; @@ -72,12 +73,13 @@ impl VirtualColumnHandler for RealVirtualColumnHandler { async fn do_refresh_virtual_column( &self, - fuse_table: &FuseTable, ctx: Arc, + fuse_table: &FuseTable, virtual_columns: Vec, segment_locs: Option>, + pipeline: &mut Pipeline, ) -> Result<()> { - do_refresh_virtual_column(fuse_table, ctx, virtual_columns, segment_locs).await + do_refresh_virtual_column(ctx, fuse_table, virtual_columns, segment_locs, pipeline).await } } diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index 6ebd11a7a709..5ed79b727779 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -15,11 +15,16 @@ use databend_common_base::base::tokio; use databend_common_catalog::plan::Projection; use databend_common_exception::Result; +use databend_common_storage::read_parquet_schema_async_rs; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseStorageFormat; use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::TableContext; use databend_enterprise_query::storages::fuse::operations::virtual_columns::do_refresh_virtual_column; +use databend_query::pipelines::executor::ExecutorSettings; +use databend_query::pipelines::executor::PipelineCompleteExecutor; +use databend_query::pipelines::PipelineBuildResult; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; @@ -55,8 +60,29 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { let write_settings = fuse_table.get_write_settings(); let storage_format = write_settings.storage_format; + let mut build_res = PipelineBuildResult::create(); let segment_locs = Some(snapshot.segments.clone()); - do_refresh_virtual_column(fuse_table, table_ctx, virtual_columns, segment_locs).await?; + do_refresh_virtual_column( + table_ctx.clone(), + fuse_table, + virtual_columns, + segment_locs, + &mut build_res.main_pipeline, + ) + .await?; + + let settings = table_ctx.get_settings(); + build_res.set_max_threads(settings.get_max_threads()? as usize); + let settings = ExecutorSettings::try_create(table_ctx.clone())?; + + if build_res.main_pipeline.is_complete_pipeline()? { + let mut pipelines = build_res.sources_pipelines; + pipelines.push(build_res.main_pipeline); + + let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; + table_ctx.set_executor(complete_executor.get_inner())?; + complete_executor.execute()?; + } let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone()); @@ -78,8 +104,12 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { assert!(dal.is_exist(&virtual_loc).await?); let schema = match storage_format { - FuseStorageFormat::Parquet => block_reader.sync_read_schema(&virtual_loc), - FuseStorageFormat::Native => block_reader.sync_read_native_schema(&virtual_loc), + FuseStorageFormat::Parquet => read_parquet_schema_async_rs(dal, &virtual_loc, None) + .await + .ok(), + FuseStorageFormat::Native => { + block_reader.async_read_native_schema(&virtual_loc).await + } }; assert!(schema.is_some()); let schema = schema.unwrap(); diff --git a/src/query/ee_features/virtual_column/Cargo.toml b/src/query/ee_features/virtual_column/Cargo.toml index 9db4efce6dcd..cd95d376abaf 100644 --- a/src/query/ee_features/virtual_column/Cargo.toml +++ b/src/query/ee_features/virtual_column/Cargo.toml @@ -19,6 +19,7 @@ databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } databend-common-meta-app = { workspace = true } +databend-common-pipeline-core = { workspace = true } databend-common-storages-fuse = { workspace = true } databend-storages-common-table-meta = { workspace = true } diff --git a/src/query/ee_features/virtual_column/src/virtual_column.rs b/src/query/ee_features/virtual_column/src/virtual_column.rs index f1e2fb225604..310c03726971 100644 --- a/src/query/ee_features/virtual_column/src/virtual_column.rs +++ b/src/query/ee_features/virtual_column/src/virtual_column.rs @@ -23,6 +23,7 @@ use databend_common_meta_app::schema::DropVirtualColumnReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; use databend_common_meta_app::schema::UpdateVirtualColumnReq; use databend_common_meta_app::schema::VirtualColumnMeta; +use databend_common_pipeline_core::Pipeline; use databend_common_storages_fuse::FuseTable; use databend_storages_common_table_meta::meta::Location; @@ -54,10 +55,11 @@ pub trait VirtualColumnHandler: Sync + Send { async fn do_refresh_virtual_column( &self, - fuse_table: &FuseTable, ctx: Arc, + fuse_table: &FuseTable, virtual_columns: Vec, segment_locs: Option>, + pipeline: &mut Pipeline, ) -> Result<()>; } @@ -109,13 +111,14 @@ impl VirtualColumnHandlerWrapper { #[async_backtrace::framed] pub async fn do_refresh_virtual_column( &self, - fuse_table: &FuseTable, ctx: Arc, + fuse_table: &FuseTable, virtual_columns: Vec, segment_locs: Option>, + pipeline: &mut Pipeline, ) -> Result<()> { self.handler - .do_refresh_virtual_column(fuse_table, ctx, virtual_columns, segment_locs) + .do_refresh_virtual_column(ctx, fuse_table, virtual_columns, segment_locs, pipeline) .await } } diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index b13852e8788a..ecdcbab1e9fb 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -18,7 +18,6 @@ use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::ListIndexesByIdReq; @@ -175,13 +174,26 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc) -> Result<()> { ctx_cloned.clone(), *virtual_column_plan, )?; - let build_res = refresh_virtual_column_interpreter.execute2().await?; - if !build_res.main_pipeline.is_empty() { - return Err(ErrorCode::Internal( - "Logical error, refresh virtual column is an empty pipeline.", - )); + let mut build_res = refresh_virtual_column_interpreter.execute2().await?; + if build_res.main_pipeline.is_empty() { + return Ok(()); + } + + let settings = ctx_cloned.get_settings(); + build_res.set_max_threads(settings.get_max_threads()? as usize); + let settings = ExecutorSettings::try_create(ctx_cloned.clone())?; + + if build_res.main_pipeline.is_complete_pipeline()? { + let mut pipelines = build_res.sources_pipelines; + pipelines.push(build_res.main_pipeline); + + let complete_executor = + PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; + ctx_cloned.set_executor(complete_executor.get_inner())?; + complete_executor.execute() + } else { + Ok(()) } - Ok(()) } _ => unreachable!(), } diff --git a/src/query/service/src/interpreters/interpreter_virtual_column_refresh.rs b/src/query/service/src/interpreters/interpreter_virtual_column_refresh.rs index 89d8b420d00f..625b8e9dd7d0 100644 --- a/src/query/service/src/interpreters/interpreter_virtual_column_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_virtual_column_refresh.rs @@ -53,14 +53,10 @@ impl Interpreter for RefreshVirtualColumnInterpreter { LicenseManagerSwitch::instance() .check_enterprise_enabled(self.ctx.get_license_key(), VirtualColumn)?; - let catalog_name = self.plan.catalog.clone(); - let db_name = self.plan.database.clone(); - let tbl_name = self.plan.table.clone(); let table = self .ctx - .get_table(&catalog_name, &db_name, &tbl_name) + .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) .await?; - // check mutability table.check_mutable()?; @@ -68,11 +64,18 @@ impl Interpreter for RefreshVirtualColumnInterpreter { let virtual_columns = self.plan.virtual_columns.clone(); let segment_locs = self.plan.segment_locs.clone(); + let mut build_res = PipelineBuildResult::create(); let handler = get_virtual_column_handler(); let _ = handler - .do_refresh_virtual_column(fuse_table, self.ctx.clone(), virtual_columns, segment_locs) + .do_refresh_virtual_column( + self.ctx.clone(), + fuse_table, + virtual_columns, + segment_locs, + &mut build_res.main_pipeline, + ) .await?; - Ok(PipelineBuildResult::create()) + Ok(build_res) } } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs index 12510db6c5b4..0f2b2bcf8cb7 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs @@ -24,7 +24,6 @@ use databend_storages_common_cache::TableDataCacheKey; use databend_storages_common_io::MergeIOReader; use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::ColumnMeta; -use opendal::Operator; use crate::io::BlockReader; use crate::BlockReadResult; @@ -116,17 +115,4 @@ impl BlockReader { Ok(block_read_res) } - - #[inline] - #[async_backtrace::framed] - async fn read_range( - op: Operator, - path: &str, - index: usize, - start: u64, - end: u64, - ) -> Result<(usize, Vec)> { - let chunk = op.read_with(path).range(start..end).await?; - Ok((index, chunk.to_vec())) - } } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs index 2eb33327ef51..8e0511332c24 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs @@ -14,12 +14,9 @@ use std::collections::HashSet; -use arrow::datatypes::Schema as ArrowSchema; use databend_common_catalog::plan::PartInfoPtr; use databend_common_exception::Result; use databend_common_expression::ColumnId; -use databend_common_storage::parquet_rs::infer_schema_with_extension; -use databend_common_storage::parquet_rs::read_metadata_sync; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::TableDataCacheKey; @@ -78,11 +75,4 @@ impl BlockReader { Ok(block_read_res) } - - pub fn sync_read_schema(&self, loc: &str) -> Option { - let metadata = read_metadata_sync(loc, &self.operator, None).ok()?; - debug_assert_eq!(metadata.num_row_groups(), 1); - let schema = infer_schema_with_extension(metadata.file_metadata()).ok()?; - Some(schema) - } } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs index 67e74e0b6246..7bc18d094735 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::datatypes::Schema as ArrowSchema; use databend_common_arrow::arrow::array::Array; -use databend_common_arrow::native::read::reader::infer_schema; +use databend_common_arrow::native::read::reader::infer_schema_async; use databend_common_arrow::native::read::reader::NativeReader; use databend_common_arrow::native::read::NativeReadBuf; use databend_common_catalog::plan::PartInfoPtr; @@ -249,16 +249,14 @@ impl BlockReader { Ok(DataBlock::new(entries, nums_rows.unwrap_or(0))) } - pub fn sync_read_native_schema(&self, loc: &str) -> Option { - let meta = self.operator.blocking().stat(loc).ok()?; - let mut reader = self - .operator - .blocking() - .reader(loc) - .ok()? - .into_std_read(0..meta.content_length()) + #[async_backtrace::framed] + pub async fn async_read_native_schema(&self, loc: &str) -> Option { + let op = &self.operator; + let stat = op.stat(loc).await.ok()?; + let reader = op.reader(loc).await.ok()?; + let schema = infer_schema_async(reader, stat.content_length()) + .await .ok()?; - let schema = infer_schema(&mut reader).ok()?; let schema = DataSchema::try_from(&schema).ok()?; Some(ArrowSchema::from(&schema)) } From d1ad6f4d134b3f077296deb7c421d6dd2ea0e1b9 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 24 Oct 2024 08:27:43 +0800 Subject: [PATCH 2/2] feat: add deterministic block level sampling for small datasets (#16670) --- .../filter_selectivity_sample.rs | 8 ++-- .../storages/fuse/src/pruning/fuse_pruner.rs | 42 +++++++++++++++---- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs index 53f1e994613f..5243158260f0 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/filter_selectivity_sample.rs @@ -56,16 +56,16 @@ pub async fn filter_selectivity_sample( .as_ref() .and_then(|s| s.num_rows) .unwrap_or(0); - // Calculate sample size (0.2% of total data) let sample_size = (num_rows as f64 * 0.002).ceil(); let mut new_s_expr = s_expr.clone(); // If the table is too small, we don't need to sample. if sample_size >= 10.0 { - scan.sample = Some(SampleConfig { + let sample_conf = SampleConfig { row_level: Some(SampleRowLevel::RowsNum(sample_size)), - block_level: None, - }); + block_level: Some(50.0), + }; + scan.sample = Some(sample_conf); let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))); new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]); let collect_statistics_optimizer = diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 089262841036..89f0a95b6306 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::max; use std::sync::Arc; use databend_common_base::base::tokio::sync::Semaphore; @@ -50,6 +51,7 @@ use log::warn; use opendal::Operator; use rand::distributions::Bernoulli; use rand::distributions::Distribution; +use rand::prelude::SliceRandom; use rand::thread_rng; use crate::io::BloomIndexBuilder; @@ -62,6 +64,8 @@ use crate::pruning::FusePruningStatistics; use crate::pruning::InvertedIndexPruner; use crate::pruning::SegmentLocation; +const SMALL_DATASET_SAMPLE_THRESHOLD: usize = 100; + pub struct PruningContext { pub ctx: Arc, pub dal: Operator, @@ -366,15 +370,39 @@ impl FusePruner { let mut block_metas = Self::extract_block_metas(&location.location.0, &info, true)?; if let Some(probability) = sample_probability { - let mut sample_block_metas = Vec::with_capacity(block_metas.len()); - let mut rng = thread_rng(); - let bernoulli = Bernoulli::new(probability).unwrap(); - for block in block_metas.iter() { - if bernoulli.sample(&mut rng) { - sample_block_metas.push(block.clone()); + if block_metas.len() <= SMALL_DATASET_SAMPLE_THRESHOLD { + // Deterministic sampling for small datasets + // Ensure at least one block is sampled for small datasets + let sample_size = max( + 1, + (block_metas.len() as f64 * probability).round() as usize, + ); + let mut rng = thread_rng(); + block_metas = Arc::new( + block_metas + .choose_multiple(&mut rng, sample_size) + .cloned() + .collect(), + ); + } else { + // Random sampling for larger datasets + let mut sample_block_metas = + Vec::with_capacity(block_metas.len()); + let mut rng = thread_rng(); + let bernoulli = Bernoulli::new(probability).unwrap(); + for block in block_metas.iter() { + if bernoulli.sample(&mut rng) { + sample_block_metas.push(block.clone()); + } + } + // Ensure at least one block is sampled for large datasets too + if sample_block_metas.is_empty() && !block_metas.is_empty() { + // Safe to unwrap, because we've checked that block_metas is not empty + sample_block_metas + .push(block_metas.choose(&mut rng).unwrap().clone()); } + block_metas = Arc::new(sample_block_metas); } - block_metas = Arc::new(sample_block_metas); } res.extend(block_pruner.pruning(location.clone(), block_metas).await?); }