Skip to content

Commit

Permalink
reimplement push_down_filter to remove global-state (#4365)
Browse files Browse the repository at this point in the history
* reimplement `push_down_filter`

* add comment

* fix union replace

* fix but when meet same name but different qualifier, and add ut.c

* add UT `filter_complex_agg` `test_union_different_schema`

* fix regression for push_down_filter meet subquery-alias

* polish comment

* add UT confirm that avoid duplicate Filters

* merge confirm UT

* remove TODO
  • Loading branch information
jackwener authored Nov 30, 2022
1 parent f2e2c29 commit 3fe542f
Show file tree
Hide file tree
Showing 7 changed files with 604 additions and 659 deletions.
8 changes: 4 additions & 4 deletions benchmarks/expected-plans/q21.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
Inner Join: l1.l_orderkey = orders.o_orderkey
Inner Join: supplier.s_suppkey = l1.l_suppkey
TableScan: supplier projection=[s_suppkey, s_name, s_nationkey]
Filter: l1.l_receiptdate > l1.l_commitdate
SubqueryAlias: l1
SubqueryAlias: l1
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate]
Filter: orders.o_orderstatus = Utf8("F")
TableScan: orders projection=[o_orderkey, o_orderstatus]
Filter: nation.n_name = Utf8("SAUDI ARABIA")
TableScan: nation projection=[n_nationkey, n_name]
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
Filter: l3.l_receiptdate > l3.l_commitdate
SubqueryAlias: l3
SubqueryAlias: l3
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate]
8 changes: 4 additions & 4 deletions benchmarks/expected-plans/q7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST,
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount, l_shipdate]
TableScan: orders projection=[o_orderkey, o_custkey]
TableScan: customer projection=[c_custkey, c_nationkey]
Filter: n1.n_name = Utf8("FRANCE") OR n1.n_name = Utf8("GERMANY")
SubqueryAlias: n1
SubqueryAlias: n1
Filter: nation.n_name = Utf8("FRANCE") OR nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
Filter: n2.n_name = Utf8("GERMANY") OR n2.n_name = Utf8("FRANCE")
SubqueryAlias: n2
SubqueryAlias: n2
Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = Utf8("FRANCE")
TableScan: nation projection=[n_nationkey, n_name]
13 changes: 6 additions & 7 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,15 +1636,14 @@ async fn reduce_left_join_3() -> Result<()> {
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t3.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
]
;
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub mod eliminate_filter;
pub mod eliminate_limit;
pub mod eliminate_outer_join;
pub mod filter_null_join_keys;
pub mod filter_push_down;
pub mod inline_table_scan;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
pub mod propagate_empty_relation;
pub mod push_down_filter;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::push_down_filter::PushDownFilter;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
use crate::simplify_expressions::SimplifyExpressions;
Expand Down Expand Up @@ -184,8 +184,9 @@ impl Optimizer {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(EliminateOuterJoin::new()));
rules.push(Arc::new(FilterPushDown::new()));
// Filters can't be pushed down past Limits, we should do PushDownFilter after LimitPushDown
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(PushDownFilter::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));

// The previous optimizations added expressions and projections,
Expand Down
Loading

0 comments on commit 3fe542f

Please sign in to comment.