diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 107bbed46143..2a4703206875 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -223,6 +223,20 @@ impl PruningPredicate { pub fn predicate_expr(&self) -> &Arc { &self.predicate_expr } + + /// Returns all need column indexes to evaluate this pruning predicate + pub(crate) fn need_input_columns_ids(&self) -> HashSet { + let mut set = HashSet::new(); + self.required_columns.columns.iter().for_each(|x| { + match self.schema().column_with_name(x.0.name.as_str()) { + None => {} + Some(y) => { + set.insert(y.0); + } + } + }); + set + } } /// Handles creating references to the min/max statistics diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 68c902a40b50..731eae1be669 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -44,6 +44,7 @@ use crate::{ }, scalar::ScalarValue, }; +use arrow::array::{BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array}; use arrow::datatypes::DataType; use arrow::{ array::ArrayRef, @@ -57,16 +58,18 @@ use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use log::debug; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; use parquet::errors::ParquetError; +use parquet::file::page_index::index::Index; use parquet::file::{ metadata::{ParquetMetaData, RowGroupMetaData}, properties::WriterProperties, statistics::Statistics as ParquetStatistics, }; +use parquet::format::PageLocation; use parquet::schema::types::ColumnDescriptor; #[derive(Debug, Clone, Default)] @@ -435,9 +438,44 @@ impl FileOpener for ParquetOpener { } }; - let groups = builder.metadata().row_groups(); + let file_metadata = builder.metadata(); + let groups = file_metadata.row_groups(); let row_groups = - prune_row_groups(groups, file_range, pruning_predicate, &metrics); + prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics); + + if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) { + let file_offset_indexes = file_metadata.offset_indexes(); + let file_page_indexes = file_metadata.page_indexes(); + if let (Some(file_offset_indexes), Some(file_page_indexes)) = + (file_offset_indexes, file_page_indexes) + { + let mut selectors = Vec::with_capacity(row_groups.len()); + for r in &row_groups { + selectors.extend( + prune_pages_in_one_row_group( + &groups[*r], + pruning_predicate.clone(), + file_offset_indexes.get(*r), + file_page_indexes.get(*r), + &metrics, + ) + .map_err(|e| { + ArrowError::ParquetError(format!( + "Fail in prune_pages_in_one_row_group: {}", + e + )) + }), + ); + } + debug!( + "Use filter and page index create RowSelection {:?} ", + &selectors + ); + builder = builder.with_row_selection(RowSelection::from( + selectors.into_iter().flatten().collect::>(), + )); + } + } let stream = builder .with_projection(mask) @@ -460,6 +498,20 @@ impl FileOpener for ParquetOpener { } } +// Check PruningPredicates just work on one column. +fn check_page_index_push_down_valid(predicate: &Option) -> bool { + if let Some(predicate) = predicate { + // for now we only support pushDown on one col, because each col may have different page numbers, its hard to get + // `num_containers` from `PruningStatistics`. + let cols = predicate.need_input_columns_ids(); + //Todo more specific rules + if cols.len() == 1 { + return true; + } + } + false +} + /// Factory of parquet file readers. /// /// Provides means to implement custom data access interface. @@ -617,6 +669,16 @@ struct RowGroupPruningStatistics<'a> { parquet_schema: &'a Schema, } +/// Wraps page_index statistics in a way +/// that implements [`PruningStatistics`] +struct PagesPruningStatistics<'a> { + //row_group_metadata: &'a RowGroupMetaData, + page_indexes: &'a Vec, + offset_indexes: &'a Vec>, + parquet_schema: &'a Schema, + col_id: usize, +} + // TODO: consolidate code with arrow-rs // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. @@ -749,6 +811,61 @@ macro_rules! get_null_count_values { }}; } +// Extract the min or max value calling `func` from page idex +macro_rules! get_min_max_values_for_page_index { + ($self:expr, $column:expr, $func:ident) => {{ + if let Some((col_id_index, _field)) = + $self.parquet_schema.column_with_name(&$column.name) + { + if let Some(page_index) = $self.page_indexes.get(col_id_index) { + match page_index { + Index::NONE => None, + Index::INT32(index) => { + let vec = &index.indexes; + Some(Arc::new(Int32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT64(index) => { + let vec = &index.indexes; + Some(Arc::new(Int64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::FLOAT(index) => { + let vec = &index.indexes; + Some(Arc::new(Float32Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::DOUBLE(index) => { + let vec = &index.indexes; + Some(Arc::new(Float64Array::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::BOOLEAN(index) => { + let vec = &index.indexes; + Some(Arc::new(BooleanArray::from_iter( + vec.iter().map(|x| x.$func().cloned()), + ))) + } + Index::INT96(_) + | Index::BYTE_ARRAY(_) + | Index::FIXED_LEN_BYTE_ARRAY(_) => { + //Todo support these type + None + } + } + } else { + None + } + } else { + None + } + }}; +} + // Convert parquet column schema to arrow data type, and just consider the // decimal data type. fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { @@ -785,6 +902,57 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } } +impl<'a> PruningStatistics for PagesPruningStatistics<'a> { + fn min_values(&self, column: &Column) -> Option { + get_min_max_values_for_page_index!(self, column, min) + } + + fn max_values(&self, column: &Column) -> Option { + get_min_max_values_for_page_index!(self, column, max) + } + + fn num_containers(&self) -> usize { + self.offset_indexes.get(self.col_id).unwrap().len() + } + + fn null_counts(&self, column: &Column) -> Option { + if let Some((col_id_index, _field)) = + self.parquet_schema.column_with_name(&column.name) + { + if let Some(page_index) = self.page_indexes.get(col_id_index) { + match page_index { + Index::NONE => None, + Index::BOOLEAN(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT32(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT64(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::FLOAT(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::DOUBLE(index) => Some(Arc::new(Int64Array::from_iter( + index.indexes.iter().map(|x| x.null_count), + ))), + Index::INT96(_) + | Index::BYTE_ARRAY(_) + | Index::FIXED_LEN_BYTE_ARRAY(_) => { + // Todo support these types + None + } + } + } else { + None + } + } else { + None + } + } +} + fn prune_row_groups( groups: &[RowGroupMetaData], range: Option, @@ -828,6 +996,97 @@ fn prune_row_groups( filtered } +fn prune_pages_in_one_row_group( + group: &RowGroupMetaData, + predicate: Option, + offset_indexes: Option<&Vec>>, + page_indexes: Option<&Vec>, + metrics: &ParquetFileMetrics, +) -> Result> { + let num_rows = group.num_rows() as usize; + if let (Some(predicate), Some(offset_indexes), Some(page_indexes)) = + (&predicate, offset_indexes, page_indexes) + { + let pruning_stats = PagesPruningStatistics { + page_indexes, + offset_indexes, + parquet_schema: predicate.schema().as_ref(), + // now we assume only support one col. + col_id: *predicate + .need_input_columns_ids() + .iter() + .take(1) + .next() + .unwrap(), + }; + + match predicate.prune(&pruning_stats) { + Ok(values) => { + let mut vec = Vec::with_capacity(values.len()); + if let Some(cols_offset_indexes) = + offset_indexes.get(pruning_stats.col_id) + { + let row_vec = + create_row_count_in_each_page(cols_offset_indexes, num_rows); + assert_eq!(row_vec.len(), values.len()); + let mut sum_row = *row_vec.first().unwrap(); + let mut selected = *values.first().unwrap(); + + for (i, &f) in values.iter().skip(1).enumerate() { + if f == selected { + sum_row += *row_vec.get(i).unwrap(); + } else { + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + sum_row = *row_vec.get(i).unwrap(); + selected = f; + } + } + + let selector = if selected { + RowSelector::select(sum_row) + } else { + RowSelector::skip(sum_row) + }; + vec.push(selector); + return Ok(vec); + } else { + debug!("Error evaluating page index predicate values missing page index col_id is{}", pruning_stats.col_id); + metrics.predicate_evaluation_errors.add(1); + } + } + // stats filter array could not be built + // return a closure which will not filter out any row groups + Err(e) => { + debug!("Error evaluating page index predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + } + } + } + Err(DataFusionError::ParquetError(ParquetError::General( + "Got some error in prune_pages_in_one_row_group, plz try open the debuglog mode" + .to_string(), + ))) +} + +fn create_row_count_in_each_page( + location: &Vec, + num_rows: usize, +) -> Vec { + let mut vec = Vec::with_capacity(location.len()); + location.windows(2).for_each(|x| { + let start = x[0].first_row_index as usize; + let end = x[1].first_row_index as usize; + vec.push(end - start); + }); + vec.push(num_rows - location.last().unwrap().first_row_index as usize); + vec +} + /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( state: &SessionState, @@ -2154,4 +2413,68 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn parquet_exec_with_page_index_filter() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let object_store_url = ObjectStoreUrl::local_filesystem(); + let store = session_ctx + .runtime_env() + .object_store(&object_store_url) + .unwrap(); + + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/alltypes_tiny_pages.parquet", testdata); + + let meta = local_unpartitioned_file(filename); + + let schema = ParquetFormat::default() + .infer_schema(&store, &[meta.clone()]) + .await + .unwrap(); + + let partitioned_file = PartitionedFile { + object_meta: meta, + partition_values: vec![], + range: None, + extensions: None, + }; + + // create filter month == 1; + let filter = col("month").eq(lit(1_i32)); + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url, + file_groups: vec![vec![partitioned_file]], + file_schema: schema, + statistics: Statistics::default(), + // file has 10 cols so index 12 should be month + projection: None, + limit: None, + table_partition_cols: vec![], + }, + Some(filter), + None, + ); + + let parquet_exec_page_index = parquet_exec + .clone() + .with_scan_options(ParquetScanOptions::default().with_page_index(true)); + + let mut results = parquet_exec_page_index.execute(0, task_ctx)?; + + let batch = results.next().await.unwrap()?; + + // from the page index should create below RowSelection + // vec.push(RowSelector::select(312)); + // vec.push(RowSelector::skip(3330)); + // vec.push(RowSelector::select(333)); + // vec.push(RowSelector::skip(3330)); + // total 645 row + + assert_eq!(batch.num_rows(), 645); + Ok(()) + } }