Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reimplement push_down_filter to remove global-state #4365

Merged
merged 10 commits into from
Nov 30, 2022
8 changes: 4 additions & 4 deletions benchmarks/expected-plans/q21.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
Inner Join: l1.l_orderkey = orders.o_orderkey
Inner Join: supplier.s_suppkey = l1.l_suppkey
TableScan: supplier projection=[s_suppkey, s_name, s_nationkey]
Filter: l1.l_receiptdate > l1.l_commitdate
SubqueryAlias: l1
SubqueryAlias: l1
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate]
Filter: orders.o_orderstatus = Utf8("F")
TableScan: orders projection=[o_orderkey, o_orderstatus]
Filter: nation.n_name = Utf8("SAUDI ARABIA")
TableScan: nation projection=[n_nationkey, n_name]
SubqueryAlias: l2
TableScan: lineitem projection=[l_orderkey, l_suppkey]
Filter: l3.l_receiptdate > l3.l_commitdate
SubqueryAlias: l3
SubqueryAlias: l3
Filter: lineitem.l_receiptdate > lineitem.l_commitdate
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate]
8 changes: 4 additions & 4 deletions benchmarks/expected-plans/q7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST,
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount, l_shipdate]
TableScan: orders projection=[o_orderkey, o_custkey]
TableScan: customer projection=[c_custkey, c_nationkey]
Filter: n1.n_name = Utf8("FRANCE") OR n1.n_name = Utf8("GERMANY")
SubqueryAlias: n1
SubqueryAlias: n1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 these plan changes certainly look better to me

Filter: nation.n_name = Utf8("FRANCE") OR nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
Filter: n2.n_name = Utf8("GERMANY") OR n2.n_name = Utf8("FRANCE")
SubqueryAlias: n2
SubqueryAlias: n2
Filter: nation.n_name = Utf8("GERMANY") OR nation.n_name = Utf8("FRANCE")
TableScan: nation projection=[n_nationkey, n_name]
13 changes: 6 additions & 7 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,15 +1636,14 @@ async fn reduce_left_join_3() -> Result<()> {
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t3.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a better plan as the filters have been pushed down through the join 👍

" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
]
;
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub mod eliminate_filter;
pub mod eliminate_limit;
pub mod eliminate_outer_join;
pub mod filter_null_join_keys;
pub mod filter_push_down;
pub mod inline_table_scan;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
pub mod propagate_empty_relation;
pub mod push_down_filter;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::propagate_empty_relation::PropagateEmptyRelation;
use crate::push_down_filter::PushDownFilter;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
use crate::simplify_expressions::SimplifyExpressions;
Expand Down Expand Up @@ -184,8 +184,9 @@ impl Optimizer {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(EliminateOuterJoin::new()));
rules.push(Arc::new(FilterPushDown::new()));
// Filters can't be pushed down past Limits, we should do PushDownFilter after LimitPushDown
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(PushDownFilter::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));

// The previous optimizations added expressions and projections,
Expand Down
Loading