diff --git a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs new file mode 100644 index 000000000000..c59459ba6172 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use parquet::file::metadata::RowGroupMetaData; + +/// A selection of rows and row groups within a ParquetFile to decode. +/// +/// A `ParquetAccessPlan` is used to limit the row groups and data pages a `ParquetExec` +/// will read and decode to improve performance. +/// +/// Note that page level pruning based on ArrowPredicate is applied after all of +/// these selections +/// +/// # Example +/// +/// For example, given a Parquet file with 4 row groups, a `ParquetAccessPlan` +/// can be used to specify skipping row group 0 and 2, scanning a range of rows +/// in row group 1, and scanning all rows in row group 3 as follows: +/// +/// ```rust +/// # use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; +/// // Default to scan all row groups +/// let mut access_plan = ParquetAccessPlan::new_all(4); +/// access_plan.skip(0); // skip row group +/// // Use parquet reader RowSelector to specify scanning rows 100-200 and 350-400 +/// // in a row group that has 1000 rows +/// let row_selection = RowSelection::from(vec![ +/// RowSelector::skip(100), +/// RowSelector::select(100), +/// RowSelector::skip(150), +/// RowSelector::select(50), +/// RowSelector::skip(600), // skip last 600 rows +/// ]); +/// access_plan.scan_selection(1, row_selection); +/// access_plan.skip(2); // skip row group 2 +/// // row group 3 is scanned by default +/// ``` +/// +/// The resulting plan would look like: +/// +/// ```text +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// +/// │ │ SKIP +/// +/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// Row Group 0 +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// ┌────────────────┐ SCAN ONLY ROWS +/// │└────────────────┘ │ 100-200 +/// ┌────────────────┐ 350-400 +/// │└────────────────┘ │ +/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +/// Row Group 1 +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// SKIP +/// │ │ +/// +/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// Row Group 2 +/// ┌───────────────────┐ +/// │ │ SCAN ALL ROWS +/// │ │ +/// │ │ +/// └───────────────────┘ +/// Row Group 3 +/// ``` +#[derive(Debug, Clone, PartialEq)] +pub struct ParquetAccessPlan { + /// How to access the i-th row group + row_groups: Vec, +} + +/// Describes how the parquet reader will access a row group +#[derive(Debug, Clone, PartialEq)] +pub enum RowGroupAccess { + /// Do not read the row group at all + Skip, + /// Read all rows from the row group + Scan, + /// Scan only the specified rows within the row group + Selection(RowSelection), +} + +impl RowGroupAccess { + /// Return true if this row group should be scanned + pub fn should_scan(&self) -> bool { + match self { + RowGroupAccess::Skip => false, + RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true, + } + } +} + +impl ParquetAccessPlan { + /// Create a new `ParquetAccessPlan` that scans all row groups + pub fn new_all(row_group_count: usize) -> Self { + Self { + row_groups: vec![RowGroupAccess::Scan; row_group_count], + } + } + + /// Create a new `ParquetAccessPlan` that scans no row groups + pub fn new_none(row_group_count: usize) -> Self { + Self { + row_groups: vec![RowGroupAccess::Skip; row_group_count], + } + } + + /// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es + pub fn new(row_groups: Vec) -> Self { + Self { row_groups } + } + + /// Set the i-th row group to the specified [`RowGroupAccess`] + pub fn set(&mut self, idx: usize, access: RowGroupAccess) { + self.row_groups[idx] = access; + } + + /// skips the i-th row group (should not be scanned) + pub fn skip(&mut self, idx: usize) { + self.set(idx, RowGroupAccess::Skip); + } + + /// Return true if the i-th row group should be scanned + pub fn should_scan(&self, idx: usize) -> bool { + self.row_groups[idx].should_scan() + } + + /// Set to scan only the [`RowSelection`] in the specified row group. + /// + /// Behavior is different depending on the existing access + /// * [`RowGroupAccess::Skip`]: does nothing + /// * [`RowGroupAccess::Scan`]: Updates to scan only the rows in the `RowSelection` + /// * [`RowGroupAccess::Selection`]: Updates to scan only the intersection of the existing selection and the new selection + pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) { + self.row_groups[idx] = match &self.row_groups[idx] { + // already skipping the entire row group + RowGroupAccess::Skip => RowGroupAccess::Skip, + RowGroupAccess::Scan => RowGroupAccess::Selection(selection), + RowGroupAccess::Selection(existing_selection) => { + RowGroupAccess::Selection(existing_selection.intersection(&selection)) + } + } + } + + /// Return an overall `RowSelection`, if needed + /// + /// This is used to compute the row selection for the parquet reader. See + /// [`ArrowReaderBuilder::with_row_selection`] for more details. + /// + /// Returns + /// * `None` if there are no [`RowGroupAccess::Selection`] + /// * `Some(selection)` if there are [`RowGroupAccess::Selection`]s + /// + /// The returned selection represents which rows to scan across any row + /// row groups which are not skipped. + /// + /// # Notes + /// + /// If there are no [`RowGroupAccess::Selection`]s, the overall row + /// selection is `None` because each row group is either entirely skipped or + /// scanned, which is covered by [`Self::row_group_indexes`]. + /// + /// If there are any [`RowGroupAccess::Selection`], an overall row selection + /// is returned for *all* the rows in the row groups that are not skipped. + /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`]. + /// + /// # Example: No Selections + /// + /// Given an access plan like this + /// + /// ```text + /// RowGroupAccess::Scan (scan all row group 0) + /// RowGroupAccess::Skip (skip row group 1) + /// RowGroupAccess::Scan (scan all row group 2) + /// RowGroupAccess::Scan (scan all row group 3) + /// ``` + /// + /// The overall row selection would be `None` because there are no + /// [`RowGroupAccess::Selection`]s. The row group indexes + /// returned by [`Self::row_group_indexes`] would be `0, 2, 3` . + /// + /// # Example: With Selections + /// + /// Given an access plan like this: + /// + /// ```text + /// RowGroupAccess::Scan (scan all row group 0) + /// RowGroupAccess::Skip (skip row group 1) + /// RowGroupAccess::Select (skip 50, scan 50, skip 900) (scan rows 50-100 in row group 2) + /// RowGroupAccess::Scan (scan all row group 3) + /// ``` + /// + /// Assuming each row group has 1000 rows, the resulting row selection would + /// be the rows to scan in row group 0, 2 and 4: + /// + /// ```text + /// RowSelection::Select(1000) (scan all rows in row group 0) + /// RowSelection::Skip(50) (skip first 50 rows in row group 2) + /// RowSelection::Select(50) (scan rows 50-100 in row group 2) + /// RowSelection::Skip(900) (skip last 900 rows in row group 2) + /// RowSelection::Select(1000) (scan all rows in row group 3) + /// ``` + /// + /// Note there is no entry for the (entirely) skipped row group 1. + /// + /// The row group indexes returned by [`Self::row_group_indexes`] would + /// still be `0, 2, 3` . + /// + /// [`ArrowReaderBuilder::with_row_selection`]: parquet::arrow::arrow_reader::ArrowReaderBuilder::with_row_selection + pub fn into_overall_row_selection( + self, + row_group_meta_data: &[RowGroupMetaData], + ) -> Option { + assert_eq!(row_group_meta_data.len(), self.row_groups.len()); + // Intuition: entire row groups are filtered out using + // `row_group_indexes` which come from Skip and Scan. An overall + // RowSelection is only useful if there is any parts *within* a row group + // which can be filtered out, that is a `Selection`. + if !self + .row_groups + .iter() + .any(|rg| matches!(rg, RowGroupAccess::Selection(_))) + { + return None; + } + + let total_selection: RowSelection = self + .row_groups + .into_iter() + .zip(row_group_meta_data.iter()) + .flat_map(|(rg, rg_meta)| { + match rg { + RowGroupAccess::Skip => vec![], + RowGroupAccess::Scan => { + // need a row group access to scan the entire row group (need row group counts) + vec![RowSelector::select(rg_meta.num_rows() as usize)] + } + RowGroupAccess::Selection(selection) => { + let selection: Vec = selection.into(); + selection + } + } + }) + .collect(); + + Some(total_selection) + } + + /// Return an iterator over the row group indexes that should be scanned + pub fn row_group_index_iter(&self) -> impl Iterator + '_ { + self.row_groups.iter().enumerate().filter_map(|(idx, b)| { + if b.should_scan() { + Some(idx) + } else { + None + } + }) + } + + /// Return a vec of all row group indexes to scan + pub fn row_group_indexes(&self) -> Vec { + self.row_group_index_iter().collect() + } + + /// Return the total number of row groups (not the total number or groups to + /// scan) + pub fn len(&self) -> usize { + self.row_groups.len() + } + + /// Return true if there are no row groups + pub fn is_empty(&self) -> bool { + self.row_groups.is_empty() + } + + /// Get a reference to the inner accesses + pub fn inner(&self) -> &[RowGroupAccess] { + &self.row_groups + } + + /// Covert into the inner row group accesses + pub fn into_inner(self) -> Vec { + self.row_groups + } +} + +#[cfg(test)] +mod test { + use super::*; + use parquet::basic::LogicalType; + use parquet::file::metadata::ColumnChunkMetaData; + use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor}; + use std::sync::{Arc, OnceLock}; + + #[test] + fn test_only_scans() { + let access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Scan, + RowGroupAccess::Scan, + RowGroupAccess::Scan, + ]); + + let row_group_indexes = access_plan.row_group_indexes(); + let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); + + // scan all row groups, no selection + assert_eq!(row_group_indexes, vec![0, 1, 2, 3]); + assert_eq!(row_selection, None); + } + + #[test] + fn test_only_skips() { + let access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Skip, + RowGroupAccess::Skip, + RowGroupAccess::Skip, + RowGroupAccess::Skip, + ]); + + let row_group_indexes = access_plan.row_group_indexes(); + let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); + + // skip all row groups, no selection + assert_eq!(row_group_indexes, vec![] as Vec); + assert_eq!(row_selection, None); + } + #[test] + fn test_mixed_1() { + let access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Selection( + vec![RowSelector::select(5), RowSelector::skip(7)].into(), + ), + RowGroupAccess::Skip, + RowGroupAccess::Skip, + ]); + + let row_group_indexes = access_plan.row_group_indexes(); + let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); + + assert_eq!(row_group_indexes, vec![0, 1]); + assert_eq!( + row_selection, + Some( + vec![ + // select the entire first row group + RowSelector::select(10), + // selectors from the second row group + RowSelector::select(5), + RowSelector::skip(7) + ] + .into() + ) + ); + } + + #[test] + fn test_mixed_2() { + let access_plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Skip, + RowGroupAccess::Scan, + RowGroupAccess::Selection( + vec![RowSelector::select(5), RowSelector::skip(7)].into(), + ), + RowGroupAccess::Scan, + ]); + + let row_group_indexes = access_plan.row_group_indexes(); + let row_selection = access_plan.into_overall_row_selection(row_group_metadata()); + + assert_eq!(row_group_indexes, vec![1, 2, 3]); + assert_eq!( + row_selection, + Some( + vec![ + // select the entire second row group + RowSelector::select(20), + // selectors from the third row group + RowSelector::select(5), + RowSelector::skip(7), + // select the entire fourth row group + RowSelector::select(40), + ] + .into() + ) + ); + } + + static ROW_GROUP_METADATA: OnceLock> = OnceLock::new(); + + /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows + /// respectively + fn row_group_metadata() -> &'static [RowGroupMetaData] { + ROW_GROUP_METADATA.get_or_init(|| { + let schema_descr = get_test_schema_descr(); + let row_counts = [10, 20, 30, 40]; + + row_counts + .into_iter() + .map(|num_rows| { + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(num_rows) + .build() + .unwrap(); + + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(num_rows) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect() + }) + } + + /// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String` + fn get_test_schema_descr() -> SchemaDescPtr { + use parquet::basic::Type as PhysicalType; + use parquet::schema::types::Type as SchemaType; + let field = SchemaType::primitive_type_builder("a", PhysicalType::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::String)) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index f0328098b406..04b25069e923 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -47,6 +47,7 @@ use log::debug; use parquet::basic::{ConvertedType, LogicalType}; use parquet::schema::types::ColumnDescriptor; +mod access_plan; mod metrics; mod opener; mod page_filter; @@ -59,6 +60,7 @@ mod writer; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; +pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; @@ -152,8 +154,9 @@ pub use writer::plan_to_parquet; /// the file. /// /// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) -/// via [`ParquetFileReaderFactory`] and applies any predicates and projections -/// to determine what pages must be read. +/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by +/// applying predicates to metadata. The plan and projections are used to +/// determine what pages must be read. /// /// * Step 4: The stream begins reading data, fetching the required pages /// and incrementally decoding them. diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 5fb21975df4a..a5047e487eee 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -18,8 +18,10 @@ //! [`ParquetOpener`] for opening Parquet files use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; -use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index}; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupAccessPlanFilter; +use crate::datasource::physical_plan::parquet::{ + row_filter, should_enable_page_index, ParquetAccessPlan, +}; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, }; @@ -137,7 +139,8 @@ impl FileOpener for ParquetOpener { let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); let rg_metadata = file_metadata.row_groups(); // track which row groups to actually read - let mut row_groups = RowGroupSet::new(rg_metadata.len()); + let access_plan = ParquetAccessPlan::new_all(rg_metadata.len()); + let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); // if there is a range restricting what parts of the file to read if let Some(range) = file_range.as_ref() { row_groups.prune_by_range(rg_metadata, range); @@ -164,24 +167,30 @@ impl FileOpener for ParquetOpener { } } + let mut access_plan = row_groups.build(); + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if enable_page_index && !row_groups.is_empty() { + if enable_page_index && !access_plan.is_empty() { if let Some(p) = page_pruning_predicate { - let pruned = p.prune( + access_plan = p.prune_plan_with_page_index( + access_plan, &file_schema, builder.parquet_schema(), - &row_groups, file_metadata.as_ref(), &file_metrics, - )?; - if let Some(row_selection) = pruned { - builder = builder.with_row_selection(row_selection); - } + ); } } + let row_group_indexes = access_plan.row_group_indexes(); + if let Some(row_selection) = + access_plan.into_overall_row_selection(rg_metadata) + { + builder = builder.with_row_selection(row_selection); + } + if let Some(limit) = limit { builder = builder.with_limit(limit) } @@ -189,7 +198,7 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_groups.indexes()) + .with_row_groups(row_group_indexes) .build()?; let adapted = stream diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index d47d5c56bdf9..7429ca593820 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -22,16 +22,15 @@ use arrow::array::{ StringArray, }; use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; +use arrow::{array::ArrayRef, datatypes::SchemaRef}; use arrow_schema::Schema; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, - errors::ParquetError, file::{ metadata::{ParquetMetaData, RowGroupMetaData}, page_index::index::Index, @@ -42,10 +41,10 @@ use std::collections::HashSet; use std::sync::Arc; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; use crate::datasource::physical_plan::parquet::statistics::{ from_bytes_to_i128, parquet_column, }; +use crate::datasource::physical_plan::parquet::ParquetAccessPlan; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -111,6 +110,7 @@ pub struct PagePruningPredicate { impl PagePruningPredicate { /// Create a new [`PagePruningPredicate`] + // TODO: this is infallaible -- it can not return an error pub fn try_new(expr: &Arc, schema: SchemaRef) -> Result { let predicates = split_conjunction(expr) .into_iter() @@ -129,105 +129,117 @@ impl PagePruningPredicate { Ok(Self { predicates }) } - /// Returns a [`RowSelection`] for the given file - pub fn prune( + /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the + /// parquet page index, if any + pub fn prune_plan_with_page_index( &self, + mut access_plan: ParquetAccessPlan, arrow_schema: &Schema, parquet_schema: &SchemaDescriptor, - row_groups: &RowGroupSet, file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, - ) -> Result> { + ) -> ParquetAccessPlan { // scoped timer updates on drop let _timer_guard = file_metrics.page_index_eval_time.timer(); if self.predicates.is_empty() { - return Ok(None); + return access_plan; } let page_index_predicates = &self.predicates; let groups = file_metadata.row_groups(); if groups.is_empty() { - return Ok(None); + return access_plan; } - let file_offset_indexes = file_metadata.offset_index(); - let file_page_indexes = file_metadata.column_index(); - let (file_offset_indexes, file_page_indexes) = match ( - file_offset_indexes, - file_page_indexes, - ) { - (Some(o), Some(i)) => (o, i), - _ => { - trace!( + let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_metadata.offset_index(), file_metadata.column_index()) + else { + trace!( "skip page pruning due to lack of indexes. Have offset: {}, column index: {}", - file_offset_indexes.is_some(), file_page_indexes.is_some() + file_metadata.offset_index().is_some(), file_metadata.column_index().is_some() ); - return Ok(None); - } + return access_plan; }; - let mut row_selections = Vec::with_capacity(page_index_predicates.len()); - for predicate in page_index_predicates { - // find column index in the parquet schema - let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); - let mut selectors = Vec::with_capacity(row_groups.len()); - for r in row_groups.iter() { + // track the total number of rows that should be skipped + let mut total_skip = 0; + + let row_group_indexes = access_plan.row_group_indexes(); + for r in row_group_indexes { + // The selection for this particular row group + let mut overall_selection = None; + for predicate in page_index_predicates { + // find column index in the parquet schema + let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); let row_group_metadata = &groups[r]; - let rg_offset_indexes = file_offset_indexes.get(r); - let rg_page_indexes = file_page_indexes.get(r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = - (rg_page_indexes, rg_offset_indexes, col_idx) - { - selectors.extend( - prune_pages_in_one_row_group( - row_group_metadata, - predicate, - rg_offset_indexes.get(col_idx), - rg_page_indexes.get(col_idx), - groups[r].column(col_idx).column_descr(), - file_metrics, - ) - .map_err(|e| { - ArrowError::ParquetError(format!( - "Fail in prune_pages_in_one_row_group: {e}" - )) - }), + let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = ( + file_page_indexes.get(r), + file_offset_indexes.get(r), + col_idx, + ) else { + trace!( + "Did not have enough metadata to prune with page indexes, \ + falling back to all rows", ); + continue; + }; + + let selection = prune_pages_in_one_row_group( + row_group_metadata, + predicate, + rg_offset_indexes.get(col_idx), + rg_page_indexes.get(col_idx), + groups[r].column(col_idx).column_descr(), + file_metrics, + ); + + let Some(selection) = selection else { + trace!("No pages pruned in prune_pages_in_one_row_group"); + continue; + }; + + debug!("Use filter and page index to create RowSelection {:?} from predicate: {:?}", + &selection, + predicate.predicate_expr(), + ); + + overall_selection = update_selection(overall_selection, selection); + + // if the overall selection has ruled out all rows, no need to + // continue with the other predicates + let selects_any = overall_selection + .as_ref() + .map(|selection| selection.selects_any()) + .unwrap_or(true); + + if !selects_any { + break; + } + } + + if let Some(overall_selection) = overall_selection { + if overall_selection.selects_any() { + let rows_skipped = rows_skipped(&overall_selection); + trace!("Overall selection from predicate skipped {rows_skipped}: {overall_selection:?}"); + total_skip += rows_skipped; + access_plan.scan_selection(r, overall_selection) } else { + // Selection skips all rows, so skip the entire row group + let rows_skipped = groups[r].num_rows() as usize; + access_plan.skip(r); + total_skip += rows_skipped; trace!( - "Did not have enough metadata to prune with page indexes, \ - falling back to all rows", + "Overall selection from predicate is empty, \ + skipping all {rows_skipped} rows in row group {r}" ); - // fallback select all rows - let all_selected = - vec![RowSelector::select(groups[r].num_rows() as usize)]; - selectors.push(all_selected); } } - debug!( - "Use filter and page index create RowSelection {:?} from predicate: {:?}", - &selectors, - predicate.predicate_expr(), - ); - row_selections.push(selectors.into_iter().flatten().collect::>()); } - let final_selection = combine_multi_col_selection(row_selections); - let total_skip = - final_selection.iter().fold( - 0, - |acc, x| { - if x.skip { - acc + x.row_count - } else { - acc - } - }, - ); file_metrics.page_index_rows_filtered.add(total_skip); - Ok(Some(final_selection)) + access_plan } /// Returns the number of filters in the [`PagePruningPredicate`] @@ -236,6 +248,24 @@ impl PagePruningPredicate { } } +/// returns the number of rows skipped in the selection +/// TODO should this be upstreamed to RowSelection? +fn rows_skipped(selection: &RowSelection) -> usize { + selection + .iter() + .fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc }) +} + +fn update_selection( + current_selection: Option, + row_selection: RowSelection, +) -> Option { + match current_selection { + None => Some(row_selection), + Some(current_selection) => Some(current_selection.intersection(&row_selection)), + } +} + /// Returns the column index in the row parquet schema for the single /// column of a single column pruning predicate. /// @@ -282,22 +312,8 @@ fn find_column_index( parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } -/// Intersects the [`RowSelector`]s -/// -/// For exampe, given: -/// * `RowSelector1: [ Skip(0~199), Read(200~299)]` -/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]` -/// -/// The final selection is the intersection of these `RowSelector`s: -/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]` -fn combine_multi_col_selection(row_selections: Vec>) -> RowSelection { - row_selections - .into_iter() - .map(RowSelection::from) - .reduce(|s1, s2| s1.intersection(&s2)) - .unwrap() -} - +/// Returns a `RowSelection` for the pages in this RowGroup if any +/// rows can be pruned based on the page index fn prune_pages_in_one_row_group( group: &RowGroupMetaData, predicate: &PruningPredicate, @@ -305,63 +321,61 @@ fn prune_pages_in_one_row_group( col_page_indexes: Option<&Index>, col_desc: &ColumnDescriptor, metrics: &ParquetFileMetrics, -) -> Result> { +) -> Option { let num_rows = group.num_rows() as usize; - if let (Some(col_offset_indexes), Some(col_page_indexes)) = + let (Some(col_offset_indexes), Some(col_page_indexes)) = (col_offset_indexes, col_page_indexes) - { - let target_type = parquet_to_arrow_decimal_type(col_desc); - let pruning_stats = PagesPruningStatistics { - col_page_indexes, - col_offset_indexes, - target_type: &target_type, - num_rows_in_row_group: group.num_rows(), - }; + else { + return None; + }; - match predicate.prune(&pruning_stats) { - Ok(values) => { - let mut vec = Vec::with_capacity(values.len()); - let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); - assert_eq!(row_vec.len(), values.len()); - let mut sum_row = *row_vec.first().unwrap(); - let mut selected = *values.first().unwrap(); - trace!("Pruned to {:?} using {:?}", values, pruning_stats); - for (i, &f) in values.iter().enumerate().skip(1) { - if f == selected { - sum_row += *row_vec.get(i).unwrap(); - } else { - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - sum_row = *row_vec.get(i).unwrap(); - selected = f; - } - } + let target_type = parquet_to_arrow_decimal_type(col_desc); + let pruning_stats = PagesPruningStatistics { + col_page_indexes, + col_offset_indexes, + target_type: &target_type, + num_rows_in_row_group: group.num_rows(), + }; - let selector = if selected { - RowSelector::select(sum_row) - } else { - RowSelector::skip(sum_row) - }; - vec.push(selector); - return Ok(vec); - } + let values = match predicate.prune(&pruning_stats) { + Ok(values) => values, + Err(e) => { // stats filter array could not be built // return a result which will not filter out any pages - Err(e) => { - debug!("Error evaluating page index predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); - return Ok(vec![RowSelector::select(group.num_rows() as usize)]); - } + debug!("Error evaluating page index predicate values {e}"); + metrics.predicate_evaluation_errors.add(1); + return None; + } + }; + + let mut vec = Vec::with_capacity(values.len()); + let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + trace!("Pruned to {:?} using {:?}", values, pruning_stats); + for (i, &f) in values.iter().enumerate().skip(1) { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; } } - Err(DataFusionError::ParquetError(ParquetError::General( - "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" - .to_string(), - ))) + + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + Some(RowSelection::from(vec)) } fn create_row_count_in_each_page( diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7dd91d3d4e4b..e2548412cc9d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -36,58 +36,35 @@ use crate::datasource::physical_plan::parquet::statistics::{ }; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use super::ParquetFileMetrics; +use super::{ParquetAccessPlan, ParquetFileMetrics}; -/// Tracks which RowGroups within a parquet file should be scanned. +/// Reduces the [`ParquetAccessPlan`] based on row group level metadata. /// -/// This struct encapsulates the various types of pruning that can be applied to -/// a set of row groups within a parquet file, progressively narrowing down the -/// set of row groups that should be scanned. -#[derive(Debug, PartialEq)] -pub struct RowGroupSet { - /// `row_groups[i]` is true if the i-th row group should be scanned - row_groups: Vec, +/// This struct implements the various types of pruning that are applied to a +/// set of row groups within a parquet file, progressively narrowing down the +/// set of row groups (and ranges/selections within those row groups) that +/// should be scanned, based on the available metadata. +#[derive(Debug, Clone, PartialEq)] +pub struct RowGroupAccessPlanFilter { + /// which row groups should be accessed + access_plan: ParquetAccessPlan, } -impl RowGroupSet { - /// Create a new `RowGroupSet` with all row groups set to true (will be scanned) - pub fn new(num_row_groups: usize) -> Self { - Self { - row_groups: vec![true; num_row_groups], - } - } - - /// Set the i-th row group to false (should not be scanned) - pub fn do_not_scan(&mut self, idx: usize) { - self.row_groups[idx] = false; - } - - /// Return true if the i-th row group should be scanned - fn should_scan(&self, idx: usize) -> bool { - self.row_groups[idx] +impl RowGroupAccessPlanFilter { + /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan + /// based on metadata and statistics + pub fn new(access_plan: ParquetAccessPlan) -> Self { + Self { access_plan } } - /// Return the total number of row groups (not the total number to be scanned) - pub fn len(&self) -> usize { - self.row_groups.len() - } - - /// Return true if there are no row groups + /// Return true if there are no row groups to scan pub fn is_empty(&self) -> bool { - self.row_groups.is_empty() - } - - /// Return an iterator over the row group indexes that should be scanned - pub fn iter(&self) -> impl Iterator + '_ { - self.row_groups - .iter() - .enumerate() - .filter_map(|(idx, &b)| if b { Some(idx) } else { None }) + self.access_plan.is_empty() } - /// Return a `Vec` of row group indices that should be scanned - pub fn indexes(&self) -> Vec { - self.iter().collect() + /// Returns the inner access plan + pub fn build(self) -> ParquetAccessPlan { + self.access_plan } /// Prune remaining row groups to only those within the specified range. @@ -97,9 +74,9 @@ impl RowGroupSet { /// # Panics /// if `groups.len() != self.len()` pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } @@ -113,7 +90,7 @@ impl RowGroupSet { .dictionary_page_offset() .unwrap_or_else(|| col.data_page_offset()); if !range.contains(offset) { - self.do_not_scan(idx); + self.access_plan.skip(idx); } } } @@ -135,9 +112,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(groups.len(), self.len()); + assert_eq!(groups.len(), self.access_plan.len()); for (idx, metadata) in groups.iter().enumerate() { - if !self.should_scan(idx) { + if !self.access_plan.should_scan(idx) { continue; } let pruning_stats = RowGroupPruningStatistics { @@ -150,7 +127,7 @@ impl RowGroupSet { // NB: false means don't scan row group if !values[0] { metrics.row_groups_pruned_statistics.add(1); - self.do_not_scan(idx); + self.access_plan.skip(idx); continue; } } @@ -179,9 +156,9 @@ impl RowGroupSet { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { - assert_eq!(builder.metadata().num_row_groups(), self.len()); - for idx in 0..self.len() { - if !self.should_scan(idx) { + assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len()); + for idx in 0..self.access_plan.len() { + if !self.access_plan.should_scan(idx) { continue; } @@ -230,7 +207,7 @@ impl RowGroupSet { if prune_group { metrics.row_groups_pruned_bloom_filter.add(1); - self.do_not_scan(idx) + self.access_plan.skip(idx) } else if !stats.column_sbbf.is_empty() { metrics.row_groups_matched_bloom_filter.add(1); } @@ -500,7 +477,7 @@ mod tests { ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -534,7 +511,7 @@ mod tests { let metrics = parquet_file_metrics(); // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -581,7 +558,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -599,7 +576,7 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -655,7 +632,7 @@ mod tests { let groups = &[rgm1, rgm2]; // the first row group should be left because c1 is greater than zero // the second should be filtered out because c1 is less than zero - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &file_schema, &schema_descr, @@ -704,7 +681,7 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". - let mut row_groups = RowGroupSet::new(2); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -736,7 +713,8 @@ mod tests { let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false - let mut row_groups = RowGroupSet::new(groups.len()); + let mut row_groups = + RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(groups.len())); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -796,7 +774,7 @@ mod tests { vec![ParquetStatistics::int32(Some(100), None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -864,7 +842,7 @@ mod tests { vec![ParquetStatistics::int32(None, Some(2), None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(4); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(4)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -915,7 +893,7 @@ mod tests { vec![ParquetStatistics::int64(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -989,7 +967,7 @@ mod tests { )], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1052,7 +1030,7 @@ mod tests { vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); let metrics = parquet_file_metrics(); - let mut row_groups = RowGroupSet::new(3); + let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3)); row_groups.prune_by_statistics( &schema, &schema_descr, @@ -1179,7 +1157,7 @@ mod tests { ) .await .unwrap(); - assert!(pruned_row_groups.indexes().is_empty()); + assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty()); } #[tokio::test] @@ -1251,12 +1229,12 @@ mod tests { impl ExpectedPruning { /// asserts that the pruned row group match this expectation - fn assert(&self, row_groups: &RowGroupSet) { - let num_row_groups = row_groups.len(); + fn assert(&self, row_groups: &RowGroupAccessPlanFilter) { + let num_row_groups = row_groups.access_plan.len(); assert!(num_row_groups > 0); let num_pruned = (0..num_row_groups) .filter_map(|i| { - if row_groups.should_scan(i) { + if row_groups.access_plan.should_scan(i) { None } else { Some(1) @@ -1278,14 +1256,14 @@ mod tests { ); } ExpectedPruning::Some(expected) => { - let actual = row_groups.indexes(); + let actual = row_groups.access_plan.row_group_indexes(); assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}"); } } } } - fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) { + fn assert_pruned(row_groups: RowGroupAccessPlanFilter, expected: ExpectedPruning) { expected.assert(&row_groups); } @@ -1386,7 +1364,7 @@ mod tests { file_name: &str, data: bytes::Bytes, pruning_predicate: &PruningPredicate, - ) -> Result { + ) -> Result { use object_store::{ObjectMeta, ObjectStore}; let object_meta = ObjectMeta { @@ -1411,7 +1389,8 @@ mod tests { }; let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); - let mut pruned_row_groups = RowGroupSet::new(builder.metadata().num_row_groups()); + let access_plan = ParquetAccessPlan::new_all(builder.metadata().num_row_groups()); + let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan); pruned_row_groups .prune_by_bloom_filters( pruning_predicate.schema(),