From de93d2ab17e9a8d895a8b51b7a1b378c5bb18db0 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 8 Apr 2024 10:52:30 +0800 Subject: [PATCH 1/6] Prune columns are all null in ParquetExec by row_counts in pruning statistics --- .../physical_plan/parquet/row_groups.rs | 10 ++++-- datafusion/core/tests/parquet/mod.rs | 30 ++++++++++++++++ .../core/tests/parquet/row_group_pruning.rs | 34 +++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 8df4925fc566..4e73483fa77d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -342,8 +342,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { scalar.to_array().ok() } - fn row_counts(&self, _column: &Column) -> Option { - None + fn row_counts(&self, column: &Column) -> Option { + let (c, _) = self.column(&column.name)?; + let scalar = ScalarValue::UInt64(Some(c.num_values() as u64)); + scalar.to_array().ok() } fn contained( @@ -1026,15 +1028,17 @@ mod tests { column_statistics: Vec, ) -> RowGroupMetaData { let mut columns = vec![]; + let number_row = 1000; for (i, s) in column_statistics.iter().enumerate() { let column = ColumnChunkMetaData::builder(schema_descr.column(i)) .set_statistics(s.clone()) + .set_num_values(number_row.clone()) .build() .unwrap(); columns.push(column); } RowGroupMetaData::builder(schema_descr.clone()) - .set_num_rows(1000) + .set_num_rows(number_row.clone()) .set_total_byte_size(2000) .set_column_metadata(columns) .build() diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index b4415d638ada..546edae4d728 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,6 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; +use arrow_array::new_null_array; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -75,6 +76,7 @@ enum Scenario { DecimalLargePrecisionBloomFilter, ByteArray, PeriodsInColumnNames, + AllNullValues, } enum Unit { @@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch { RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap() } +/// Return record batch with i8, i16, i32, and i64 sequences with all Null values +fn make_all_null_values() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("i8", DataType::Int8, true), + Field::new("i16", DataType::Int16, true), + Field::new("i32", DataType::Int32, true), + Field::new("i64", DataType::Int64, true), + ])); + + RecordBatch::try_new( + schema, + vec![ + new_null_array(&DataType::Int8, 5), + new_null_array(&DataType::Int16, 5), + new_null_array(&DataType::Int32, 5), + new_null_array(&DataType::Int64, 5), + ], + ) + .unwrap() +} + fn create_data_batch(scenario: Scenario) -> Vec { match scenario { Scenario::Timestamps => { @@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) -> Vec { ), ] } + Scenario::AllNullValues => { + vec![ + make_all_null_values(), + make_int_batches(1, 6), + make_all_null_values(), + ] + } } } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b7b434d1c3d3..633737d3fc5a 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1264,3 +1264,37 @@ async fn prune_periods_in_column_names() { .test_row_group_prune() .await; } + +#[tokio::test] +async fn test_row_group_all_null_values() { + // Tree row groups: + // 1. all Null values + // 2. values from 1 to 5 + // 3. all Null values + + // After pruning, only row group 2 should be selected + RowGroupPruningTest::new() + .with_scenario(Scenario::AllNullValues) + .with_query("SELECT * FROM t WHERE \"i8\" <= 5") + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_expected_rows(5) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .test_row_group_prune() + .await; + + // After pruning, only row group 1,3 should be selected + RowGroupPruningTest::new() + .with_scenario(Scenario::AllNullValues) + .with_query("SELECT * FROM t WHERE \"i8\" is Null") + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_expected_rows(10) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .test_row_group_prune() + .await; +} From 18c9f4d7e68b3eb470b2b2ef3f2297e018dd4298 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 8 Apr 2024 11:33:13 +0800 Subject: [PATCH 2/6] fix clippy --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 4e73483fa77d..c54787d04f91 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1032,13 +1032,13 @@ mod tests { for (i, s) in column_statistics.iter().enumerate() { let column = ColumnChunkMetaData::builder(schema_descr.column(i)) .set_statistics(s.clone()) - .set_num_values(number_row.clone()) + .set_num_values(number_row) .build() .unwrap(); columns.push(column); } RowGroupMetaData::builder(schema_descr.clone()) - .set_num_rows(number_row.clone()) + .set_num_rows(number_row) .set_total_byte_size(2000) .set_column_metadata(columns) .build() From 4c908f03ff5f449cdc4dc7da13908c3f285068d7 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Mon, 8 Apr 2024 18:06:31 +0800 Subject: [PATCH 3/6] Update datafusion/core/tests/parquet/row_group_pruning.rs Co-authored-by: Ruihang Xia --- datafusion/core/tests/parquet/row_group_pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 633737d3fc5a..e3c0e3455e75 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1267,7 +1267,7 @@ async fn prune_periods_in_column_names() { #[tokio::test] async fn test_row_group_all_null_values() { - // Tree row groups: + // Three row groups: // 1. all Null values // 2. values from 1 to 5 // 3. all Null values From 11567d9f68e0e2e0459cf784f6f51f8e0e568912 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 9 Apr 2024 14:04:19 +0800 Subject: [PATCH 4/6] fix comment and support isNotNUll --- .../core/src/physical_optimizer/pruning.rs | 38 +++++++++++++++---- datafusion/core/tests/parquet/mod.rs | 4 +- .../core/tests/parquet/row_group_pruning.rs | 2 +- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 80bb5ad42e81..2359d0bc18a1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -331,6 +331,7 @@ pub trait PruningStatistics { /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` /// `x IS NULL` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_null_count > 0 END` +/// `x IS NOT NULL` | `x_null_count = 0` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation @@ -1235,10 +1236,15 @@ fn build_single_column_expr( /// returns a pruning expression in terms of IsNull that will evaluate to true /// if the column may contain null, and false if definitely does not /// contain null. +/// If set `with_not` to true: which means is not null +/// Given an expression reference to `expr`, if `expr` is a column expression, +/// returns a pruning expression in terms of IsNotNull that will evaluate to true +/// if the column not contain any null, and false if definitely contain null. fn build_is_null_column_expr( expr: &Arc, schema: &Schema, required_columns: &mut RequiredColumns, + with_not: bool, ) -> Option> { if let Some(col) = expr.as_any().downcast_ref::() { let field = schema.field_with_name(col.name()).ok()?; @@ -1247,12 +1253,21 @@ fn build_is_null_column_expr( required_columns .null_count_column_expr(col, expr, null_count_field) .map(|null_count_column_expr| { - // IsNull(column) => null_count > 0 - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::Gt, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), - )) as _ + if with_not { + // IsNotNull(column) => null_count = 0 + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Eq, + Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), + )) as _ + } else { + // IsNull(column) => null_count > 0 + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Gt, + Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), + )) as _ + } }) .ok() } else { @@ -1283,9 +1298,18 @@ fn build_predicate_expression( // predicate expression can only be a binary expression let expr_any = expr.as_any(); if let Some(is_null) = expr_any.downcast_ref::() { - return build_is_null_column_expr(is_null.arg(), schema, required_columns) + return build_is_null_column_expr(is_null.arg(), schema, required_columns, false) .unwrap_or(unhandled); } + if let Some(is_not_null) = expr_any.downcast_ref::() { + return build_is_null_column_expr( + is_not_null.arg(), + schema, + required_columns, + true, + ) + .unwrap_or(unhandled); + } if let Some(col) = expr_any.downcast_ref::() { return build_single_column_expr(col, schema, required_columns, false) .unwrap_or(unhandled); diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 546edae4d728..f36afe1976b1 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -76,7 +76,7 @@ enum Scenario { DecimalLargePrecisionBloomFilter, ByteArray, PeriodsInColumnNames, - AllNullValues, + WithNullValues, } enum Unit { @@ -822,7 +822,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { ), ] } - Scenario::AllNullValues => { + Scenario::WithNullValues => { vec![ make_all_null_values(), make_int_batches(1, 6), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index e3c0e3455e75..48369253c71d 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1266,7 +1266,7 @@ async fn prune_periods_in_column_names() { } #[tokio::test] -async fn test_row_group_all_null_values() { +async fn test_row_group_with_null_values() { // Three row groups: // 1. all Null values // 2. values from 1 to 5 From 93f9bea38a60364345095c2a3d8700cefd2136e6 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 9 Apr 2024 14:10:30 +0800 Subject: [PATCH 5/6] add test --- .../core/tests/parquet/row_group_pruning.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 48369253c71d..d9723af8e96d 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1274,7 +1274,7 @@ async fn test_row_group_with_null_values() { // After pruning, only row group 2 should be selected RowGroupPruningTest::new() - .with_scenario(Scenario::AllNullValues) + .with_scenario(Scenario::WithNullValues) .with_query("SELECT * FROM t WHERE \"i8\" <= 5") .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) @@ -1287,7 +1287,7 @@ async fn test_row_group_with_null_values() { // After pruning, only row group 1,3 should be selected RowGroupPruningTest::new() - .with_scenario(Scenario::AllNullValues) + .with_scenario(Scenario::WithNullValues) .with_query("SELECT * FROM t WHERE \"i8\" is Null") .with_expected_errors(Some(0)) .with_matched_by_stats(Some(2)) @@ -1297,4 +1297,30 @@ async fn test_row_group_with_null_values() { .with_pruned_by_bloom_filter(Some(0)) .test_row_group_prune() .await; + + // After pruning, only row group 2should be selected + RowGroupPruningTest::new() + .with_scenario(Scenario::WithNullValues) + .with_query("SELECT * FROM t WHERE \"i16\" is Not Null") + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_expected_rows(5) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .test_row_group_prune() + .await; + + // All row groups will be pruned + RowGroupPruningTest::new() + .with_scenario(Scenario::WithNullValues) + .with_query("SELECT * FROM t WHERE \"i32\" > 7") + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(3)) + .with_expected_rows(0) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .test_row_group_prune() + .await; } From 5ecdc5cfba94f61b0d1325008e01ffc620fc9b30 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 9 Apr 2024 14:13:32 +0800 Subject: [PATCH 6/6] fix conflict --- datafusion/core/src/physical_optimizer/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 2359d0bc18a1..fee2b4c1ad10 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -330,7 +330,7 @@ pub trait PruningStatistics { /// `x = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END` /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` -/// `x IS NULL` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_null_count > 0 END` +/// `x IS NULL` | `x_null_count > 0` /// `x IS NOT NULL` | `x_null_count = 0` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` ///