From 44b2580a272b0b8381fa32220aa69590e0b6be78 Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 13 Dec 2022 18:03:30 +0800 Subject: [PATCH] add `try_optimize()` for all rules. --- .../optimizer/src/common_subexpr_eliminate.rs | 58 +++++++++----- .../optimizer/src/decorrelate_where_exists.rs | 14 ++-- .../optimizer/src/decorrelate_where_in.rs | 34 +++++++-- .../optimizer/src/eliminate_cross_join.rs | 35 ++++++--- datafusion/optimizer/src/eliminate_filter.rs | 22 +++++- datafusion/optimizer/src/eliminate_limit.rs | 28 +++++-- .../optimizer/src/eliminate_outer_join.rs | 28 ++++++- .../optimizer/src/filter_null_join_keys.rs | 28 ++++++- datafusion/optimizer/src/inline_table_scan.rs | 20 ++++- .../optimizer/src/propagate_empty_relation.rs | 58 ++++++++------ datafusion/optimizer/src/push_down_filter.rs | 40 ++++++++-- datafusion/optimizer/src/push_down_limit.rs | 34 ++++++++- .../optimizer/src/push_down_projection.rs | 18 ++++- .../src/rewrite_disjunctive_predicate.rs | 20 ++++- .../optimizer/src/scalar_subquery_to_join.rs | 26 +++++-- .../src/single_distinct_to_groupby.rs | 36 +++++++-- .../optimizer/src/subquery_filter_to_join.rs | 28 +++++-- datafusion/optimizer/src/type_coercion.rs | 14 +++- .../src/unwrap_cast_in_comparison.rs | 75 +++++++++++-------- 19 files changed, 463 insertions(+), 153 deletions(-) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 482298e160e3..b541aa0ee76c 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -79,7 +79,9 @@ impl CommonSubexprEliminate { }) .collect::>>()?; - let mut new_input = self.optimize(input, optimizer_config)?; + let mut new_input = self + .try_optimize(input, optimizer_config)? + .unwrap_or_else(|| input.clone()); if !affected_id.is_empty() { new_input = build_project_plan(new_input, affected_id, expr_set)?; } @@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { let mut expr_set = ExprSet::new(); match plan { @@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate { optimizer_config, )?; - Ok(LogicalPlan::Projection(Projection::try_new_with_schema( - pop_expr(&mut new_expr)?, - Arc::new(new_input), - schema.clone(), - )?)) + Ok(Some(LogicalPlan::Projection( + Projection::try_new_with_schema( + pop_expr(&mut new_expr)?, + Arc::new(new_input), + schema.clone(), + )?, + ))) } LogicalPlan::Filter(filter) => { let input = filter.input(); @@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate { )?; if let Some(predicate) = pop_expr(&mut new_expr)?.pop() { - Ok(LogicalPlan::Filter(Filter::try_new( + Ok(Some(LogicalPlan::Filter(Filter::try_new( predicate, Arc::new(new_input), - )?)) + )?))) } else { Err(DataFusionError::Internal( "Failed to pop predicate expr".to_string(), @@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate { optimizer_config, )?; - Ok(LogicalPlan::Window(Window { + Ok(Some(LogicalPlan::Window(Window { input: Arc::new(new_input), window_expr: pop_expr(&mut new_expr)?, schema: schema.clone(), - })) + }))) } LogicalPlan::Aggregate(Aggregate { group_expr, @@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate { let new_aggr_expr = pop_expr(&mut new_expr)?; let new_group_expr = pop_expr(&mut new_expr)?; - Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( - Arc::new(new_input), - new_group_expr, - new_aggr_expr, - schema.clone(), - )?)) + Ok(Some(LogicalPlan::Aggregate( + Aggregate::try_new_with_schema( + Arc::new(new_input), + new_group_expr, + new_aggr_expr, + schema.clone(), + )?, + ))) } LogicalPlan::Sort(Sort { expr, input, fetch }) => { let input_schema = Arc::clone(input.schema()); @@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate { optimizer_config, )?; - Ok(LogicalPlan::Sort(Sort { + Ok(Some(LogicalPlan::Sort(Sort { expr: pop_expr(&mut new_expr)?, input: Arc::new(new_input), fetch: *fetch, - })) + }))) } LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_) @@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Extension(_) | LogicalPlan::Prepare(_) => { // apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 9cf9138bc062..6b5bd184f504 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -57,8 +57,10 @@ impl DecorrelateWhereExists { for it in filters.iter() { match it { Expr::Exists { subquery, negated } => { - let subquery = self.optimize(&subquery.subquery, optimizer_config)?; - let subquery = Arc::new(subquery); + let subquery = self + .try_optimize(&subquery.subquery, optimizer_config)? + .map(Arc::new) + .unwrap_or_else(|| subquery.subquery.clone()); let subquery = Subquery { subquery }; let subquery = SubqueryInfo::new(subquery.clone(), *negated); subqueries.push(subquery); @@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists { match plan { LogicalPlan::Filter(filter) => { let predicate = filter.predicate(); - let filter_input = filter.input(); + let filter_input = filter.input().as_ref(); // Apply optimizer rule to current input - let optimized_input = self.optimize(filter_input, optimizer_config)?; + let optimized_input = self + .try_optimize(filter_input, optimizer_config)? + .unwrap_or_else(|| filter_input.clone()); let (subqueries, other_exprs) = self.extract_subquery_exprs(predicate, optimizer_config)?; @@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists { } // iterate through all exists clauses in predicate, turning each into a join - let mut cur_input = (**filter_input).clone(); + let mut cur_input = filter_input.clone(); for subquery in subqueries { if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)? { diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index 1818e5897fa9..4614b0699cc7 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -59,8 +59,10 @@ impl DecorrelateWhereIn { subquery, negated, } => { - let subquery = self.optimize(&subquery.subquery, optimizer_config)?; - let subquery = Arc::new(subquery); + let subquery = self + .try_optimize(&subquery.subquery, optimizer_config)? + .map(Arc::new) + .unwrap_or_else(|| subquery.subquery.clone()); let subquery = Subquery { subquery }; let subquery = SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated); @@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> datafusion_common::Result> { match plan { LogicalPlan::Filter(filter) => { let predicate = filter.predicate(); - let filter_input = filter.input(); + let filter_input = filter.input().as_ref(); // Apply optimizer rule to current input - let optimized_input = self.optimize(filter_input, optimizer_config)?; + let optimized_input = self + .try_optimize(filter_input, optimizer_config)? + .unwrap_or_else(|| filter_input.clone()); let (subqueries, other_exprs) = self.extract_subquery_exprs(predicate, optimizer_config)?; @@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn { )?); if subqueries.is_empty() { // regular filter, no subquery exists clause here - return Ok(optimized_plan); + return Ok(Some(optimized_plan)); } // iterate through all exists clauses in predicate, turning each into a join - let mut cur_input = (**filter_input).clone(); + let mut cur_input = filter_input.clone(); for subquery in subqueries { cur_input = optimize_where_in( &subquery, @@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn { optimizer_config, )?; } - Ok(cur_input) + Ok(Some(cur_input)) } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 8ca457771646..b431742f0acf 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { let input = (**filter.input()).clone(); @@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin { )?; } _ => { - return utils::optimize_children(self, plan, _optimizer_config); + return Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)); } } @@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin { // if there are no join keys then do nothing. if all_join_keys.is_empty() { - Ok(LogicalPlan::Filter(Filter::try_new( + Ok(Some(LogicalPlan::Filter(Filter::try_new( predicate.clone(), Arc::new(left), - )?)) + )?))) } else { // remove join expressions from filter match remove_join_expressions(predicate, &all_join_keys)? { - Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new( - filter_expr, - Arc::new(left), - )?)), - _ => Ok(left), + Some(filter_expr) => Ok(Some(LogicalPlan::Filter( + Filter::try_new(filter_expr, Arc::new(left))?, + ))), + _ => Ok(Some(left)), } } } - _ => utils::optimize_children(self, plan, _optimizer_config), + _ => Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)), } } diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index d5cbcf3949c9..3e1c91a700d3 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { let predicate_and_input = match plan { LogicalPlan::Filter(filter) => match filter.predicate() { Expr::Literal(ScalarValue::Boolean(Some(v))) => { @@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter { }; match predicate_and_input { - Some((true, input)) => self.optimize(input, _optimizer_config), - Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation { + Some((true, input)) => self.try_optimize(input, _optimizer_config), + Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: input.schema().clone(), - })), + }))), None => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, _optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs index 9ad39d1db92c..d316a006cee3 100644 --- a/datafusion/optimizer/src/eliminate_limit.rs +++ b/datafusion/optimizer/src/eliminate_limit.rs @@ -40,27 +40,45 @@ impl OptimizerRule for EliminateLimit { fn optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { if let LogicalPlan::Limit(limit) = plan { match limit.fetch { Some(fetch) => { if fetch == 0 { - return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: limit.input.schema().clone(), - })); + }))); } } None => { if limit.skip == 0 { let input = &*limit.input; - return utils::optimize_children(self, input, optimizer_config); + return Ok(Some(utils::optimize_children( + self, + input, + _optimizer_config, + )?)); } } } } - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)) } fn name(&self) -> &str { diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 9be661624652..4d1fecad70e8 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -67,6 +67,16 @@ impl OptimizerRule for EliminateOuterJoin { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => match filter.input().as_ref() { LogicalPlan::Join(join) => { @@ -110,11 +120,23 @@ impl OptimizerRule for EliminateOuterJoin { null_equals_null: join.null_equals_null, }); let new_plan = from_plan(plan, &plan.expressions(), &[new_join])?; - utils::optimize_children(self, &new_plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + &new_plan, + optimizer_config, + )?)) } - _ => utils::optimize_children(self, plan, optimizer_config), + _ => Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)), }, - _ => utils::optimize_children(self, plan, optimizer_config), + _ => Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)), } } diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index be33c796ea42..f1a4f13937cd 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -40,12 +40,28 @@ impl OptimizerRule for FilterNullJoinKeys { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> datafusion_common::Result> { match plan { LogicalPlan::Join(join) if join.join_type == JoinType::Inner => { // recurse down first and optimize inputs let mut join = join.clone(); - join.left = Arc::new(self.optimize(&join.left, optimizer_config)?); - join.right = Arc::new(self.optimize(&join.right, optimizer_config)?); + join.left = Arc::new( + self.try_optimize(&join.left, optimizer_config)? + .unwrap_or_else(|| join.left.as_ref().clone()), + ); + join.right = Arc::new( + self.try_optimize(&join.right, optimizer_config)? + .unwrap_or_else(|| join.right.as_ref().clone()), + ); let left_schema = join.left.schema(); let right_schema = join.right.schema(); @@ -80,11 +96,15 @@ impl OptimizerRule for FilterNullJoinKeys { join.right.clone(), )?)); } - Ok(LogicalPlan::Join(join)) + Ok(Some(LogicalPlan::Join(join))) } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 7fdb594be13f..e0238a751302 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -40,6 +40,16 @@ impl OptimizerRule for InlineTableScan { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added @@ -57,17 +67,21 @@ impl OptimizerRule for InlineTableScan { let plan = LogicalPlanBuilder::from(plan) .project(vec![Expr::Wildcard])? .alias(table_name)?; - plan.build() + Ok(Some(plan.build()?)) } else { // No plan available, return with table scan as is - Ok(plan.clone()) + Ok(Some(plan.clone())) } } // Rest: Recurse _ => { // apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, _optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index 2de7e72ef138..eeb358900904 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -37,13 +37,23 @@ impl OptimizerRule for PropagateEmptyRelation { fn optimize( &self, plan: &LogicalPlan, - optimizer_config: &mut OptimizerConfig, + _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { // optimize child plans first let optimized_children_plan = - utils::optimize_children(self, plan, optimizer_config)?; + utils::optimize_children(self, plan, _optimizer_config)?; match &optimized_children_plan { - LogicalPlan::EmptyRelation(_) => Ok(optimized_children_plan), + LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)), LogicalPlan::Projection(_) | LogicalPlan::Filter(_) | LogicalPlan::Window(_) @@ -51,19 +61,19 @@ impl OptimizerRule for PropagateEmptyRelation { | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Repartition(_) | LogicalPlan::Limit(_) => match empty_child(&optimized_children_plan)? { - Some(empty) => Ok(empty), - None => Ok(optimized_children_plan), + Some(empty) => Ok(Some(empty)), + None => Ok(Some(optimized_children_plan)), }, LogicalPlan::CrossJoin(_) => { let (left_empty, right_empty) = binary_plan_children_is_empty(&optimized_children_plan)?; if left_empty || right_empty { - Ok(LogicalPlan::EmptyRelation(EmptyRelation { + Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: optimized_children_plan.schema().clone(), - })) + }))) } else { - Ok(optimized_children_plan) + Ok(Some(optimized_children_plan)) } } LogicalPlan::Join(join) => { @@ -83,15 +93,15 @@ impl OptimizerRule for PropagateEmptyRelation { let (left_empty, right_empty) = binary_plan_children_is_empty(&optimized_children_plan)?; if left_empty || right_empty { - Ok(LogicalPlan::EmptyRelation(EmptyRelation { + Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: optimized_children_plan.schema().clone(), - })) + }))) } else { - Ok(optimized_children_plan) + Ok(Some(optimized_children_plan)) } } else { - Ok(optimized_children_plan) + Ok(Some(optimized_children_plan)) } } LogicalPlan::Union(union) => { @@ -106,40 +116,40 @@ impl OptimizerRule for PropagateEmptyRelation { .collect::>(); if new_inputs.len() == union.inputs.len() { - Ok(optimized_children_plan) + Ok(Some(optimized_children_plan)) } else if new_inputs.is_empty() { - Ok(LogicalPlan::EmptyRelation(EmptyRelation { + Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: optimized_children_plan.schema().clone(), - })) + }))) } else if new_inputs.len() == 1 { let child = (**(union.inputs.get(0).unwrap())).clone(); if child.schema().eq(optimized_children_plan.schema()) { - Ok(child) + Ok(Some(child)) } else { - Ok(LogicalPlan::Projection(Projection::new_from_schema( + Ok(Some(LogicalPlan::Projection(Projection::new_from_schema( Arc::new(child), optimized_children_plan.schema().clone(), - ))) + )))) } } else { - Ok(LogicalPlan::Union(Union { + Ok(Some(LogicalPlan::Union(Union { inputs: new_inputs, schema: union.schema.clone(), - })) + }))) } } LogicalPlan::Aggregate(agg) => { if !agg.group_expr.is_empty() { match empty_child(&optimized_children_plan)? { - Some(empty) => Ok(empty), - None => Ok(optimized_children_plan), + Some(empty) => Ok(Some(empty)), + None => Ok(Some(optimized_children_plan)), } } else { - Ok(optimized_children_plan) + Ok(Some(optimized_children_plan)) } } - _ => Ok(optimized_children_plan), + _ => Ok(Some(optimized_children_plan)), } } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 61051a5a530e..40ed00ebda04 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -508,19 +508,41 @@ impl OptimizerRule for PushDownFilter { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { let filter = match plan { LogicalPlan::Filter(filter) => filter, // we also need to pushdown filter in Join. LogicalPlan::Join(join) => { let optimized_plan = push_down_join(plan, join, None)?; return match optimized_plan { - Some(optimized_plan) => { - utils::optimize_children(self, &optimized_plan, optimizer_config) - } - None => utils::optimize_children(self, plan, optimizer_config), + Some(optimized_plan) => Ok(Some(utils::optimize_children( + self, + &optimized_plan, + optimizer_config, + )?)), + None => Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)), }; } - _ => return utils::optimize_children(self, plan, optimizer_config), + _ => { + return Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) + } }; let child_plan = &**filter.input(); @@ -532,7 +554,7 @@ impl OptimizerRule for PushDownFilter { new_predicate, child_filter.input().clone(), )?); - return self.optimize(&new_plan, optimizer_config); + return self.try_optimize(&new_plan, optimizer_config); } LogicalPlan::Repartition(_) | LogicalPlan::Distinct(_) @@ -733,7 +755,11 @@ impl OptimizerRule for PushDownFilter { _ => plan.clone(), }; - utils::optimize_children(self, &new_plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + &new_plan, + optimizer_config, + )?)) } } diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 3ce4f441f8b6..a404762b69ad 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -80,9 +80,25 @@ impl OptimizerRule for PushDownLimit { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { let limit = match plan { LogicalPlan::Limit(limit) => limit, - _ => return utils::optimize_children(self, plan, optimizer_config), + _ => { + return Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) + } }; if let LogicalPlan::Limit(child_limit) = &*limit.input { @@ -112,12 +128,18 @@ impl OptimizerRule for PushDownLimit { fetch: new_fetch, input: Arc::new((*child_limit.input).clone()), }); - return self.optimize(&plan, optimizer_config); + return self.try_optimize(&plan, optimizer_config); } let fetch = match limit.fetch { Some(fetch) => fetch, - None => return utils::optimize_children(self, plan, optimizer_config), + None => { + return Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) + } }; let skip = limit.skip; @@ -225,7 +247,11 @@ impl OptimizerRule for PushDownLimit { _ => plan.clone(), }; - utils::optimize_children(self, &plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + &plan, + optimizer_config, + )?)) } fn name(&self) -> &str { diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 2d156d1ce398..0238148c3ec1 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -51,6 +51,16 @@ impl OptimizerRule for PushDownProjection { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { // set of all columns referred by the plan (and thus considered required by the root) let required_columns = plan .schema() @@ -58,7 +68,13 @@ impl OptimizerRule for PushDownProjection { .iter() .map(|f| f.qualified_column()) .collect::>(); - optimize_plan(self, plan, &required_columns, false, optimizer_config) + Ok(Some(optimize_plan( + self, + plan, + &required_columns, + false, + optimizer_config, + )?)) } fn name(&self) -> &str { diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 6a2ba2bde68a..3f6cc763bb30 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -129,17 +129,31 @@ impl OptimizerRule for RewriteDisjunctivePredicate { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { let predicate = predicate(filter.predicate())?; let rewritten_predicate = rewrite_predicate(predicate); let rewritten_expr = normalize_predicate(rewritten_predicate); - Ok(LogicalPlan::Filter(Filter::try_new( + Ok(Some(LogicalPlan::Filter(Filter::try_new( rewritten_expr, Arc::new(Self::optimize(self, filter.input(), _optimizer_config)?), - )?)) + )?))) } - _ => utils::optimize_children(self, plan, _optimizer_config), + _ => Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)), } } diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index f5a35c6253cf..00e4d89c8df4 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -94,6 +94,16 @@ impl OptimizerRule for ScalarSubqueryToJoin { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input @@ -104,10 +114,10 @@ impl OptimizerRule for ScalarSubqueryToJoin { if subqueries.is_empty() { // regular filter, no subquery exists clause here - return Ok(LogicalPlan::Filter(Filter::try_new( + return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), Arc::new(optimized_input), - )?)); + )?))); } // iterate through all subqueries in predicate, turning each into a join @@ -122,17 +132,21 @@ impl OptimizerRule for ScalarSubqueryToJoin { cur_input = optimized_subquery; } else { // if we can't handle all of the subqueries then bail for now - return Ok(LogicalPlan::Filter(Filter::try_new( + return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), Arc::new(optimized_input), - )?)); + )?))); } } - Ok(cur_input) + Ok(Some(cur_input)) } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 13cf6d3a8dec..1e1322395267 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -86,8 +86,18 @@ impl OptimizerRule for SingleDistinctToGroupBy { fn optimize( &self, plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, + optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Aggregate(Aggregate { input, @@ -192,16 +202,26 @@ impl OptimizerRule for SingleDistinctToGroupBy { new_aggr_exprs, )?); - Ok(LogicalPlan::Projection(Projection::try_new_with_schema( - alias_expr, - Arc::new(outer_aggr), - schema.clone(), - )?)) + Ok(Some(LogicalPlan::Projection( + Projection::try_new_with_schema( + alias_expr, + Arc::new(outer_aggr), + schema.clone(), + )?, + ))) } else { - utils::optimize_children(self, plan, _optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)) } } - _ => utils::optimize_children(self, plan, _optimizer_config), + _ => Ok(Some(utils::optimize_children( + self, + plan, + _optimizer_config, + )?)), } } fn name(&self) -> &str { diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index c077ce7e83e4..ccf6931d3e98 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -54,6 +54,16 @@ impl OptimizerRule for SubqueryFilterToJoin { plan: &LogicalPlan, optimizer_config: &mut OptimizerConfig, ) -> Result { + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + optimizer_config: &mut OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input @@ -78,10 +88,10 @@ impl OptimizerRule for SubqueryFilterToJoin { })?; if !subqueries_in_regular.is_empty() { - return Ok(LogicalPlan::Filter(Filter::try_new( + return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), Arc::new(optimized_input), - )?)); + )?))); }; // Add subquery joins to new_input @@ -150,23 +160,27 @@ impl OptimizerRule for SubqueryFilterToJoin { let new_input = match opt_result { Ok(plan) => plan, Err(_) => { - return Ok(LogicalPlan::Filter(Filter::try_new( + return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), Arc::new(optimized_input), - )?)) + )?))) } }; // Apply regular filters to join output if some or just return join if regular_filters.is_empty() { - Ok(new_input) + Ok(Some(new_input)) } else { - utils::add_filter(new_input, ®ular_filters) + Ok(Some(utils::add_filter(new_input, ®ular_filters)?)) } } _ => { // Apply the optimization to all inputs of the plan - utils::optimize_children(self, plan, optimizer_config) + Ok(Some(utils::optimize_children( + self, + plan, + optimizer_config, + )?)) } } } diff --git a/datafusion/optimizer/src/type_coercion.rs b/datafusion/optimizer/src/type_coercion.rs index 0e06f80d872e..bf8a5f2b76ac 100644 --- a/datafusion/optimizer/src/type_coercion.rs +++ b/datafusion/optimizer/src/type_coercion.rs @@ -58,9 +58,19 @@ impl OptimizerRule for TypeCoercion { fn optimize( &self, plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, + optimizer_config: &mut OptimizerConfig, ) -> Result { - optimize_internal(&DFSchema::empty(), plan) + Ok(self + .try_optimize(plan, optimizer_config)? + .unwrap_or_else(|| plan.clone())) + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { + Ok(Some(optimize_internal(&DFSchema::empty(), plan)?)) } } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 8e17c184eeab..bfcb0f85aba2 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -84,42 +84,55 @@ impl OptimizerRule for UnwrapCastInComparison { plan: &LogicalPlan, _optimizer_config: &mut OptimizerConfig, ) -> Result { - optimize(plan) + Ok(self + .try_optimize(plan, _optimizer_config)? + .unwrap_or_else(|| plan.clone())) } - fn name(&self) -> &str { - "unwrap_cast_in_comparison" - } -} + fn try_optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result> { + let new_inputs = plan + .inputs() + .into_iter() + .map(|input| { + self.try_optimize(input, _optimizer_config) + .map(|o| o.unwrap_or_else(|| input.clone())) + }) + .collect::>>()?; + + let mut schema = new_inputs.iter().map(|input| input.schema()).fold( + DFSchema::empty(), + |mut lhs, rhs| { + lhs.merge(rhs); + lhs + }, + ); -fn optimize(plan: &LogicalPlan) -> Result { - let new_inputs = plan - .inputs() - .iter() - .map(|input| optimize(input)) - .collect::>>()?; - - let mut schema = new_inputs.iter().map(|input| input.schema()).fold( - DFSchema::empty(), - |mut lhs, rhs| { - lhs.merge(rhs); - lhs - }, - ); - - schema.merge(plan.schema()); - - let mut expr_rewriter = UnwrapCastExprRewriter { - schema: Arc::new(schema), - }; + schema.merge(plan.schema()); + + let mut expr_rewriter = UnwrapCastExprRewriter { + schema: Arc::new(schema), + }; - let new_exprs = plan - .expressions() - .into_iter() - .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter)) - .collect::>>()?; + let new_exprs = plan + .expressions() + .into_iter() + .map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter)) + .collect::>>()?; + + Ok(Some(from_plan( + plan, + new_exprs.as_slice(), + new_inputs.as_slice(), + )?)) + } - from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice()) + fn name(&self) -> &str { + "unwrap_cast_in_comparison" + } } struct UnwrapCastExprRewriter {