From abac256ffc3bef47bf311850ed3ff71163acb97f Mon Sep 17 00:00:00 2001 From: Mark Sirek Date: Sat, 18 Nov 2023 23:12:47 -0800 Subject: [PATCH 1/4] Fix incorrect results for NOT IN subqueries with nulls --- datafusion/expr/src/expr_rewriter/mod.rs | 10 +- datafusion/expr/src/logical_plan/plan.rs | 2 +- .../src/decorrelate_predicate_subquery.rs | 197 +++++++++++++++- .../optimizer/src/scalar_subquery_to_join.rs | 2 +- datafusion/optimizer/src/test/mod.rs | 19 ++ datafusion/sqllogictest/test_files/joins.slt | 2 +- .../sqllogictest/test_files/subquery.slt | 220 ++++++++++++++++++ 7 files changed, 435 insertions(+), 17 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 1f04c80833f0..320b3231caa4 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -141,16 +141,14 @@ pub fn unnormalize_col(expr: Expr) -> Expr { /// Create a Column from the Scalar Expr pub fn create_col_from_scalar_expr( scalar_expr: &Expr, - subqry_alias: String, + subqry_alias: Option, ) -> Result { match scalar_expr { - Expr::Alias(Alias { name, .. }) => Ok(Column::new(Some(subqry_alias), name)), - Expr::Column(Column { relation: _, name }) => { - Ok(Column::new(Some(subqry_alias), name)) - } + Expr::Alias(Alias { name, .. }) => Ok(Column::new(subqry_alias, name)), + Expr::Column(Column { relation: _, name }) => Ok(Column::new(subqry_alias, name)), _ => { let scalar_column = scalar_expr.display_name()?; - Ok(Column::new(Some(subqry_alias), scalar_column)) + Ok(Column::new(subqry_alias, scalar_column)) } } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index a024824c7a5a..3d5f60860396 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -518,7 +518,7 @@ impl LogicalPlan { .map(|expr| { Ok(Expr::Column(create_col_from_scalar_expr( &expr, - subquery_alias.alias.to_string(), + Some(subquery_alias.alias.to_string()), )?)) }) .map_or(Ok(None), |v| v.map(Some)) diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 96b46663d8e4..56ab4f063d7e 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -26,8 +26,8 @@ use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; use datafusion_expr::{ - exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter, - LogicalPlan, LogicalPlanBuilder, Operator, + exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, ExprSchemable, + Filter, LogicalPlan, LogicalPlanBuilder, Operator, }; use log::debug; use std::collections::BTreeSet; @@ -197,7 +197,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery { /// ``` fn build_join( query_info: &SubqueryInfo, - left: &LogicalPlan, + outer_query: &LogicalPlan, alias: Arc, ) -> Result> { let where_in_expr_opt = &query_info.where_in_expr; @@ -248,6 +248,38 @@ fn build_join( .map(Option::Some) })?; + // build a predicate for comparing the left and right expressions of a given pair of outer/subquery rows + // from an IN or NOT IN predicate + let build_in_predicate = |left: Box, right: Box| -> Result { + let right_col = create_col_from_scalar_expr(right.deref(), Some(subquery_alias))?; + let eq_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); + if !query_info.negated { + // early exit if this is an IN predicate + return Ok(eq_predicate); + } + + match left.nullable(outer_query.schema())? { + true => { + // left expression is nullable; we know the predicate must take the form `left = right IS NOT FALSE` + return Ok(eq_predicate.is_not_false()); + } + false => {} + } + let unqualified_right_col = create_col_from_scalar_expr(right.deref(), None)?; + let subquery_col = query_info + .query + .subquery + .schema() + .field_from_column(&unqualified_right_col)?; + + match subquery_col.is_nullable() { + // add "IS NOT FALSE" to a NOT IN equality predicate whose subquery expression is nullable + // so that an unknown result is treated as a (possible) match + true => Ok(eq_predicate.is_not_false()), + false => Ok(eq_predicate), + } + }; + if let Some(join_filter) = match (join_filter_opt, in_predicate_opt) { ( Some(join_filter), @@ -257,8 +289,7 @@ fn build_join( right, })), ) => { - let right_col = create_col_from_scalar_expr(right.deref(), subquery_alias)?; - let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); + let in_predicate = build_in_predicate(left, right)?; Some(in_predicate.and(join_filter)) } (Some(join_filter), _) => Some(join_filter), @@ -270,8 +301,7 @@ fn build_join( right, })), ) => { - let right_col = create_col_from_scalar_expr(right.deref(), subquery_alias)?; - let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); + let in_predicate = build_in_predicate(left, right)?; Some(in_predicate) } _ => None, @@ -281,7 +311,7 @@ fn build_join( true => JoinType::LeftAnti, false => JoinType::LeftSemi, }; - let new_plan = LogicalPlanBuilder::from(left.clone()) + let new_plan = LogicalPlanBuilder::from(outer_query.clone()) .join_on(sub_query_alias, join_type, Some(join_filter))? .build()?; debug!( @@ -349,6 +379,15 @@ mod tests { )) } + fn test_nullable_subquery_with_name(name: &str) -> Result> { + let table_scan = test_table_scan_nullable_with_name(name)?; + Ok(Arc::new( + LogicalPlanBuilder::from(table_scan) + .project(vec![col("c")])? + .build()?, + )) + } + /// Test for several IN subquery expressions #[test] fn in_subquery_multiple() -> Result<()> { @@ -1077,6 +1116,148 @@ mod tests { Ok(()) } + /// Test for single NOT IN subquery filter and nullable subquery column + #[test] + fn not_in_nullable_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery( + col("c"), + test_nullable_subquery_with_name("sq")?, + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftAnti Join: Filter: test.c = __correlated_sq_1.c IS NOT FALSE [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32;N]\ + \n Projection: sq.c [c:UInt32;N]\ + \n TableScan: sq [a:UInt32;N, b:UInt32;N, c:UInt32;N]"; + + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelatePredicateSubquery::new()), + &plan, + expected, + ); + Ok(()) + } + + /// Test for single IN subquery filter and nullable subquery column + #[test] + fn in_nullable_subquery_simple() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(in_subquery( + col("c"), + test_nullable_subquery_with_name("sq")?, + ))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32;N]\ + \n Projection: sq.c [c:UInt32;N]\ + \n TableScan: sq [a:UInt32;N, b:UInt32;N, c:UInt32;N]"; + + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelatePredicateSubquery::new()), + &plan, + expected, + ); + Ok(()) + } + + /// Test for single NOT IN subquery filter and nullable outer query column + #[test] + fn not_in_nullable_outer_query_simple() -> Result<()> { + let table_scan = test_table_scan_nullable()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32;N]\ + \n LeftAnti Join: Filter: test.c = __correlated_sq_1.c IS NOT FALSE [a:UInt32;N, b:UInt32;N, c:UInt32;N]\ + \n TableScan: test [a:UInt32;N, b:UInt32;N, c:UInt32;N]\ + \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\ + \n Projection: sq.c [c:UInt32]\ + \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; + + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelatePredicateSubquery::new()), + &plan, + expected, + ); + Ok(()) + } + + #[test] + fn not_in_nullable_subquery_both_side_expr() -> Result<()> { + let table_scan = test_table_scan()?; + let subquery_scan = test_table_scan_nullable_with_name("sq")?; + + let subquery = LogicalPlanBuilder::from(subquery_scan) + .project(vec![col("c") * lit(2u32)])? + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery(col("c") + lit(1u32), Arc::new(subquery)))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftAnti Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) IS NOT FALSE [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32;N]\ + \n Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32;N]\ + \n TableScan: sq [a:UInt32;N, b:UInt32;N, c:UInt32;N]"; + + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelatePredicateSubquery::new()), + &plan, + expected, + ); + Ok(()) + } + + #[test] + fn not_in_subquery_join_filter_and_inner_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let subquery_scan = test_table_scan_nullable_with_name("sq")?; + + let subquery = LogicalPlanBuilder::from(subquery_scan) + .filter( + out_ref_col(DataType::UInt32, "test.a") + .eq(col("sq.a")) + .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))), + )? + .project(vec![col("c") * lit(2u32)])? + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(not_in_subquery(col("c") + lit(1u32), Arc::new(subquery)))? + .project(vec![col("test.b")])? + .build()?; + + let expected = "Projection: test.b [b:UInt32]\ + \n LeftAnti Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) IS NOT FALSE AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\ + \n SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32;N, a:UInt32;N]\ + \n Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32;N, a:UInt32;N]\ + \n Filter: sq.a + UInt32(1) = sq.b [a:UInt32;N, b:UInt32;N, c:UInt32;N]\ + \n TableScan: sq [a:UInt32;N, b:UInt32;N, c:UInt32;N]"; + + assert_optimized_plan_eq_display_indent( + Arc::new(DecorrelatePredicateSubquery::new()), + &plan, + expected, + ); + Ok(()) + } + #[test] fn in_subquery_both_side_expr() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 7ac0c25119c3..7a6ff40f5b78 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -221,7 +221,7 @@ impl TreeNodeRewriter for ExtractScalarSubQuery { .map_or(plan_err!("single expression required."), Ok)?; Ok(Expr::Column(create_col_from_scalar_expr( &scalar_expr, - subqry_alias, + Some(subqry_alias), )?)) } _ => Ok(expr), diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 917ddc565c9e..6b5f06296f42 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -45,6 +45,25 @@ pub fn test_table_scan() -> Result { test_table_scan_with_name("test") } +pub fn test_table_scan_nullable_fields() -> Vec { + vec![ + Field::new("a", DataType::UInt32, true), + Field::new("b", DataType::UInt32, true), + Field::new("c", DataType::UInt32, true), + ] +} + +/// some tests share a common table with different names and nullable fields +pub fn test_table_scan_nullable_with_name(name: &str) -> Result { + let schema = Schema::new(test_table_scan_nullable_fields()); + table_scan(Some(name), &schema, None)?.build() +} + +/// some tests share a common table with nullable fields +pub fn test_table_scan_nullable() -> Result { + test_table_scan_nullable_with_name("test") +} + /// Scan an empty data source, mainly used in tests pub fn scan_empty( name: Option<&str>, diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 0fea8da5a342..9ba0ae67040b 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1931,7 +1931,7 @@ where join_t1.t1_id + 12 not in (select join_t2.t2_id + 1 from join_t2 where join_t1.t1_int > 0) ---- logical_plan -LeftAnti Join: CAST(join_t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.join_t2.t2_id + Int64(1) Filter: join_t1.t1_int > UInt32(0) +LeftAnti Join: Filter: CAST(join_t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.join_t2.t2_id + Int64(1) IS NOT FALSE AND join_t1.t1_int > UInt32(0) --TableScan: join_t1 projection=[t1_id, t1_name, t1_int] --SubqueryAlias: __correlated_sq_1 ----Projection: CAST(join_t2.t2_id AS Int64) + Int64(1) diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index ef25d960c954..2e12e2ef5814 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1012,3 +1012,223 @@ catan-prod1-daily success catan-prod1-daily high #2 #3 #4 + +# +# Tests for NOT IN with a nullable subquery expression +# +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +CREATE TABLE t1(t1_id INT NOT NULL, t1_name TEXT, t1_int INT NOT NULL); + +statement ok +INSERT INTO t1 VALUES +(11, 'a', 1), +(22, 'b', 2), +(33, 'c', 3), +(44, 'd', 4), +(66, 'd', 4); + +statement ok +CREATE TABLE t1n(t1_id INT, t1_name TEXT, t1_int INT); + +statement ok +INSERT INTO t1n SELECT * FROM t1; + +statement ok +CREATE TABLE t2(t2_id INT NOT NULL, t2_name TEXT, t2_int INT NOT NULL); + +statement ok +INSERT INTO t2 VALUES +(11, 'z', 3), +(22, 'y', 1), +(44, 'x', 3), +(44, null, 3), +(55, 'w', 3), +(66, 'd', 4); + +statement ok +CREATE TABLE t2n(t2_id INT, t2_name TEXT, t2_int INT); + +statement ok +INSERT INTO t2n SELECT * FROM t2; + +statement ok +INSERT INTO t2n VALUES (null, 'a', 3); + +# NOT IN with nullable subquery expression uses IS NOT FALSE +query TT +explain SELECT t1_id, t1_name FROM t1 WHERE t1_id not in (SELECT t2_id FROM t2n) +---- +logical_plan +LeftAnti Join: Filter: t1.t1_id = __correlated_sq_1.t2_id IS NOT FALSE +--TableScan: t1 projection=[t1_id, t1_name] +--SubqueryAlias: __correlated_sq_1 +----TableScan: t2n projection=[t2_id] +physical_plan +NestedLoopJoinExec: join_type=LeftAnti, filter=t1_id@0 = t2_id@1 IS DISTINCT FROM false +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----MemoryExec: partitions=1, partition_sizes=[1] +--MemoryExec: partitions=1, partition_sizes=[2] + +# NOT IN with nullable outer query expression uses IS NOT FALSE +query TT +explain SELECT t1_id, t1_name FROM t1n WHERE t1_id not in (SELECT t2_id FROM t2) +---- +logical_plan +LeftAnti Join: Filter: t1n.t1_id = __correlated_sq_1.t2_id IS NOT FALSE +--TableScan: t1n projection=[t1_id, t1_name] +--SubqueryAlias: __correlated_sq_1 +----TableScan: t2 projection=[t2_id] +physical_plan +NestedLoopJoinExec: join_type=LeftAnti, filter=t1_id@0 = t2_id@1 IS DISTINCT FROM false +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----MemoryExec: partitions=1, partition_sizes=[1] +--MemoryExec: partitions=1, partition_sizes=[1] + +# NOT IN with null subquery value disqualifies all rows +query IT rowsort +SELECT t1_id, t1_name FROM t1 WHERE t1_id not in (SELECT t2_id FROM t2n) +---- + +# NOT IN subquery with correlated outer filter and inner nullable expression +# which isn't a simple column reference uses IS NOT FALSE +query TT +explain select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id + 12 not in ( + select t2n.t2_id + 1 from t2n where t1.t1_int < 4 + ) +---- +logical_plan +LeftAnti Join: Filter: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.t2n.t2_id + Int64(1) IS NOT FALSE AND t1.t1_int < Int32(4) +--TableScan: t1 projection=[t1_id, t1_name, t1_int] +--SubqueryAlias: __correlated_sq_1 +----Projection: CAST(t2n.t2_id AS Int64) + Int64(1) +------TableScan: t2n projection=[t2_id] +physical_plan +NestedLoopJoinExec: join_type=LeftAnti, filter=(CAST(t1_id@0 AS Int64) + 12 = t2n.t2_id + Int64(1)@2 IS DISTINCT FROM false) AND t1_int@1 < 4 +--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----MemoryExec: partitions=1, partition_sizes=[1] +--CoalescePartitionsExec +----ProjectionExec: expr=[CAST(t2_id@0 AS Int64) + 1 as t2n.t2_id + Int64(1)] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------MemoryExec: partitions=1, partition_sizes=[2] + +# NOT IN subquery with correlated outer filter and inner nullable expression +# which isn't a simple column reference should not return the `22 b 2` row. +query ITI rowsort +select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id + 12 not in ( + select t2n.t2_id + 1 from t2n where t1.t1_int < 4 + ) +---- +44 d 4 +66 d 4 + +# NOT IN subquery with correlated outer filter and non-nullable expressions in NOT IN predicate +# should not use IS NOT FALSE +query TT +explain select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id + 12 not in ( + select t2.t2_id + 1 from t2 where t1.t1_int < 4 + ) +---- +logical_plan +LeftAnti Join: CAST(t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.t2.t2_id + Int64(1) Filter: t1.t1_int < Int32(4) +--TableScan: t1 projection=[t1_id, t1_name, t1_int] +--SubqueryAlias: __correlated_sq_1 +----Projection: CAST(t2.t2_id AS Int64) + Int64(1) +------TableScan: t2 projection=[t2_id] +physical_plan +ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int] +--CoalesceBatchesExec: target_batch_size=2 +----HashJoinExec: mode=Partitioned, join_type=RightAnti, on=[(t2.t2_id + Int64(1)@0, t1.t1_id + Int64(12)@3)], filter=t1_int@0 < 4 +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([t2.t2_id + Int64(1)@0], 4), input_partitions=4 +----------ProjectionExec: expr=[CAST(t2_id@0 AS Int64) + 1 as t2.t2_id + Int64(1)] +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([t1.t1_id + Int64(12)@3], 4), input_partitions=4 +----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 12 as t1.t1_id + Int64(12)] +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] + +query ITI rowsort +select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id + 12 not in ( + select t2.t2_id + 1 from t2 where t1.t1_int < 4 + ) +---- +22 b 2 +44 d 4 +66 d 4 + +# NOT IN with nullable subquery expression and a correlated equality predicate uses IS NOT FALSE. +# The correlated equality predicate allows LeftAnti hash join to be used. +query TT +explain +select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id not in (select t2_id from t2n where t1_name = t2_name); +---- +logical_plan +LeftAnti Join: t1.t1_name = __correlated_sq_1.t2_name Filter: t1.t1_id = __correlated_sq_1.t2_id IS NOT FALSE +--TableScan: t1 projection=[t1_id, t1_name, t1_int] +--SubqueryAlias: __correlated_sq_1 +----TableScan: t2n projection=[t2_id, t2_name] +physical_plan +CoalesceBatchesExec: target_batch_size=2 +--HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(t1_name@1, t2_name@1)], filter=t1_id@0 = t2_id@1 IS DISTINCT FROM false +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([t1_name@1], 4), input_partitions=4 +--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] +----CoalesceBatchesExec: target_batch_size=2 +------RepartitionExec: partitioning=Hash([t2_name@1], 4), input_partitions=4 +--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[2] + +# Applying the `IS DISTINCT FROM false` filter as the secondary filter of the join produces correct results +query ITI rowsort +select t1.t1_id, + t1.t1_name, + t1.t1_int +from t1 +where t1.t1_id not in (select t2_id from t2n where t1_name = t2_name); +---- +22 b 2 +33 c 3 +44 d 4 + +# The filter should not be qualified +query T +select 'qualified filter' where 1 NOT IN (select * from (values (3), (null)) my_tab); +---- + +# The filter should not be qualified +query T +select 'qualified filter' where null NOT IN (select * from (values (1), (2)) my_tab); +---- + From 04dad997131448acc9be24bfeb7bd873d596d8c2 Mon Sep 17 00:00:00 2001 From: Mark Sirek Date: Sun, 19 Nov 2023 21:47:39 -0800 Subject: [PATCH 2/4] update sqllogictest tpch Q16 query plan --- .../sqllogictest/test_files/tpch/q16.slt.part | 44 ++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/datafusion/sqllogictest/test_files/tpch/q16.slt.part b/datafusion/sqllogictest/test_files/tpch/q16.slt.part index b93872929fe5..6ce800df3411 100644 --- a/datafusion/sqllogictest/test_files/tpch/q16.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q16.slt.part @@ -55,7 +55,7 @@ Limit: skip=0, fetch=10 ----Projection: part.p_brand, part.p_type, part.p_size, COUNT(alias1) AS supplier_cnt ------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[COUNT(alias1)]] --------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey AS alias1]], aggr=[[]] -----------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey +----------LeftAnti Join: Filter: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey IS NOT FALSE ------------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size --------------Inner Join: partsupp.ps_partkey = part.p_partkey ----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey] @@ -78,31 +78,25 @@ GlobalLimitExec: skip=0, fetch=10 ------------------CoalesceBatchesExec: target_batch_size=8192 --------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2, alias1@3], 4), input_partitions=4 ----------------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[] -------------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(ps_suppkey@0, s_suppkey@0)] +------------------------NestedLoopJoinExec: join_type=LeftAnti, filter=ps_suppkey@0 = s_suppkey@1 IS DISTINCT FROM false +--------------------------ProjectionExec: expr=[ps_suppkey@1 as ps_suppkey, p_brand@3 as p_brand, p_type@4 as p_type, p_size@5 as p_size] ----------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 ---------------------------------ProjectionExec: expr=[ps_suppkey@1 as ps_suppkey, p_brand@3 as p_brand, p_type@4 as p_type, p_size@5 as p_size] -----------------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, p_partkey@0)] ---------------------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 -------------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], has_header=false ---------------------------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 -------------------------------------------CoalesceBatchesExec: target_batch_size=8192 ---------------------------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(49) }, Literal { value: Int32(14) }, Literal { value: Int32(23) }, Literal { value: Int32(45) }, Literal { value: Int32(19) }, Literal { value: Int32(3) }, Literal { value: Int32(36) }, Literal { value: Int32(9) }]) -----------------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------------------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], has_header=false -----------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4 ---------------------------------ProjectionExec: expr=[s_suppkey@0 as s_suppkey] -----------------------------------CoalesceBatchesExec: target_batch_size=8192 -------------------------------------FilterExec: s_comment@1 LIKE %Customer%Complaints% ---------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -----------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], has_header=false - - +------------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, p_partkey@0)] +--------------------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 +------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], has_header=false +--------------------------------CoalesceBatchesExec: target_batch_size=8192 +----------------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 +------------------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(49) }, Literal { value: Int32(14) }, Literal { value: Int32(23) }, Literal { value: Int32(45) }, Literal { value: Int32(19) }, Literal { value: Int32(3) }, Literal { value: Int32(36) }, Literal { value: Int32(9) }]) +----------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], has_header=false +--------------------------CoalescePartitionsExec +----------------------------ProjectionExec: expr=[s_suppkey@0 as s_suppkey] +------------------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------------------FilterExec: s_comment@1 LIKE %Customer%Complaints% +----------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_comment], has_header=false query TTII select From 8e4903f9960366bf014b801d310ac82148d96b63 Mon Sep 17 00:00:00 2001 From: Mark Sirek Date: Mon, 20 Nov 2023 21:24:01 -0800 Subject: [PATCH 3/4] Address review comments --- datafusion/expr/src/expr_rewriter/mod.rs | 10 ++++++---- datafusion/expr/src/logical_plan/plan.rs | 2 +- .../optimizer/src/decorrelate_predicate_subquery.rs | 8 ++++---- datafusion/optimizer/src/scalar_subquery_to_join.rs | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 320b3231caa4..1f04c80833f0 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -141,14 +141,16 @@ pub fn unnormalize_col(expr: Expr) -> Expr { /// Create a Column from the Scalar Expr pub fn create_col_from_scalar_expr( scalar_expr: &Expr, - subqry_alias: Option, + subqry_alias: String, ) -> Result { match scalar_expr { - Expr::Alias(Alias { name, .. }) => Ok(Column::new(subqry_alias, name)), - Expr::Column(Column { relation: _, name }) => Ok(Column::new(subqry_alias, name)), + Expr::Alias(Alias { name, .. }) => Ok(Column::new(Some(subqry_alias), name)), + Expr::Column(Column { relation: _, name }) => { + Ok(Column::new(Some(subqry_alias), name)) + } _ => { let scalar_column = scalar_expr.display_name()?; - Ok(Column::new(subqry_alias, scalar_column)) + Ok(Column::new(Some(subqry_alias), scalar_column)) } } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 3d5f60860396..a024824c7a5a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -518,7 +518,7 @@ impl LogicalPlan { .map(|expr| { Ok(Expr::Column(create_col_from_scalar_expr( &expr, - Some(subquery_alias.alias.to_string()), + subquery_alias.alias.to_string(), )?)) }) .map_or(Ok(None), |v| v.map(Some)) diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 56ab4f063d7e..4806b11a7660 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -251,8 +251,9 @@ fn build_join( // build a predicate for comparing the left and right expressions of a given pair of outer/subquery rows // from an IN or NOT IN predicate let build_in_predicate = |left: Box, right: Box| -> Result { - let right_col = create_col_from_scalar_expr(right.deref(), Some(subquery_alias))?; - let eq_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); + let right_col = create_col_from_scalar_expr(right.deref(), subquery_alias)?; + let eq_predicate = + Expr::eq(left.deref().clone(), Expr::Column(right_col.clone())); if !query_info.negated { // early exit if this is an IN predicate return Ok(eq_predicate); @@ -265,12 +266,11 @@ fn build_join( } false => {} } - let unqualified_right_col = create_col_from_scalar_expr(right.deref(), None)?; let subquery_col = query_info .query .subquery .schema() - .field_from_column(&unqualified_right_col)?; + .field_with_unqualified_name(right_col.name.as_str())?; match subquery_col.is_nullable() { // add "IS NOT FALSE" to a NOT IN equality predicate whose subquery expression is nullable diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 7a6ff40f5b78..7ac0c25119c3 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -221,7 +221,7 @@ impl TreeNodeRewriter for ExtractScalarSubQuery { .map_or(plan_err!("single expression required."), Ok)?; Ok(Expr::Column(create_col_from_scalar_expr( &scalar_expr, - Some(subqry_alias), + subqry_alias, )?)) } _ => Ok(expr), From de49fbc327b25c871d19154fea39e32c100e8476 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 31 Jan 2024 15:54:02 -0500 Subject: [PATCH 4/4] cleanup tests --- .../sqllogictest/test_files/subquery.slt | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 30206aa353cb..00cbd053e950 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -1033,17 +1033,18 @@ catan-prod1-daily success catan-prod1-daily high #3 #4 +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + # # Tests for NOT IN with a nullable subquery expression # statement ok set datafusion.explain.logical_plan_only = false; -statement ok -DROP TABLE t1; - -statement ok -DROP TABLE t2; statement ok CREATE TABLE t1(t1_id INT NOT NULL, t1_name TEXT, t1_int INT NOT NULL); @@ -1252,6 +1253,15 @@ query T select 'qualified filter' where null NOT IN (select * from (values (1), (2)) my_tab); ---- +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +set datafusion.explain.logical_plan_only = true; + statement ok create table t(a bigint); @@ -1268,10 +1278,6 @@ logical_plan Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1) --Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a ----TableScan: t projection=[a] -physical_plan -ProjectionExec: expr=[t.a / Int64(2)Int64(2)t.a@0 as t.a / Int64(2), t.a / Int64(2)Int64(2)t.a@0 + 1 as t.a / Int64(2) + Int64(1)] ---ProjectionExec: expr=[a@0 / 2 as t.a / Int64(2)Int64(2)t.a] -----MemoryExec: partitions=1, partition_sizes=[0] statement ok set datafusion.optimizer.max_passes = 3; @@ -1283,7 +1289,6 @@ logical_plan Projection: t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2), t.a / Int64(2)Int64(2)t.a AS t.a / Int64(2) + Int64(1) --Projection: t.a / Int64(2) AS t.a / Int64(2)Int64(2)t.a ----TableScan: t projection=[a] -physical_plan -ProjectionExec: expr=[t.a / Int64(2)Int64(2)t.a@0 as t.a / Int64(2), t.a / Int64(2)Int64(2)t.a@0 + 1 as t.a / Int64(2) + Int64(1)] ---ProjectionExec: expr=[a@0 / 2 as t.a / Int64(2)Int64(2)t.a] -----MemoryExec: partitions=1, partition_sizes=[0] + +statement ok +DROP TABLE t;