Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "chore(pruning): Support IS NOT NULL predicates in PruningPredicate (#9208)" #9232

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 5 additions & 44 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,20 +620,13 @@ mod tests {
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(17), Some(30), None, 1, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
],
);
vec![rgm1, rgm2, rgm3]
vec![rgm1, rgm2]
}

#[test]
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// c1 > 15 and IsNull(c2) => c1_max > 15 and c2_null_count > 0
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand Down Expand Up @@ -664,7 +657,7 @@ mod tests {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// c1 > 15 and c2 = NULL => c1_max > 15 and NULL
// int > 1 and bool = NULL => c1_max > 1 and null
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand All @@ -679,35 +672,7 @@ mod tests {

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should all be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
}

#[test]
fn row_group_pruning_predicate_not_null_expr() {
use datafusion_expr::{col, lit};
// c1 > 15 and IsNotNull(c2) => c1_max > 15 and c2_null_count = 0
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_not_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
Expand All @@ -717,11 +682,7 @@ mod tests {
Some(&pruning_predicate),
&metrics
),
// The first row group was filtered out because c1_max is 10, which is smaller than 15.
// The second row group was filtered out because it contains null value on "c2".
// The third row group is kept because c1_max is 30, which is greater than 15 AND
// it does NOT contain any null value on "c2".
vec![2]
vec![1]
);
}

Expand Down
63 changes: 0 additions & 63 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ pub trait PruningStatistics {
/// `x < 5` | `x_max < 5`
/// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
///
/// ## Predicate Evaluation
/// The PruningPredicate works in two passes
Expand Down Expand Up @@ -1121,34 +1120,6 @@ fn build_is_null_column_expr(
}
}

/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column does NOT contain null, and false if it may contain null
fn build_is_not_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
})
.ok()
} else {
None
}
}

/// The maximum number of entries in an `InList` that might be rewritten into
/// an OR chain
const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
Expand All @@ -1175,14 +1146,6 @@ fn build_predicate_expression(
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
.unwrap_or(unhandled);
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_not_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
)
.unwrap_or(unhandled);
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down Expand Up @@ -2089,32 +2052,6 @@ mod tests {
Ok(())
}

#[test]
fn row_group_predicate_is_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 > 0";

let expr = col("c1").is_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_is_not_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 = 0";

let expr = col("c1").is_not_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_required_columns() -> Result<()> {
let schema = Schema::new(vec![
Expand Down
Loading