Skip to content

Commit

Permalink
add try_optimize() for all rules.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 13, 2022
1 parent b822b0e commit 44b2580
Show file tree
Hide file tree
Showing 19 changed files with 463 additions and 153 deletions.
58 changes: 39 additions & 19 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ impl CommonSubexprEliminate {
})
.collect::<Result<Vec<_>>>()?;

let mut new_input = self.optimize(input, optimizer_config)?;
let mut new_input = self
.try_optimize(input, optimizer_config)?
.unwrap_or_else(|| input.clone());
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
Expand All @@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut expr_set = ExprSet::new();

match plan {
Expand All @@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
)?))
Ok(Some(LogicalPlan::Projection(
Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
)?,
)))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
Expand All @@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate {
)?;

if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_input),
)?))
)?)))
} else {
Err(DataFusionError::Internal(
"Failed to pop predicate expr".to_string(),
Expand All @@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Window(Window {
Ok(Some(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
}))
})))
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
Expand All @@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate {
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;

Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
)?))
Ok(Some(LogicalPlan::Aggregate(
Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
)?,
)))
}
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
let input_schema = Arc::clone(input.schema());
Expand All @@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Sort(Sort {
Ok(Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
input: Arc::new(new_input),
fetch: *fetch,
}))
})))
}
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
Expand All @@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ impl DecorrelateWhereExists {
for it in filters.iter() {
match it {
Expr::Exists { subquery, negated } => {
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
let subquery = Arc::new(subquery);
let subquery = self
.try_optimize(&subquery.subquery, optimizer_config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery = SubqueryInfo::new(subquery.clone(), *negated);
subqueries.push(subquery);
Expand Down Expand Up @@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let filter_input = filter.input().as_ref();

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
let optimized_input = self
.try_optimize(filter_input, optimizer_config)?
.unwrap_or_else(|| filter_input.clone());

let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
Expand All @@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists {
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
let mut cur_input = filter_input.clone();
for subquery in subqueries {
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
{
Expand Down
34 changes: 26 additions & 8 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ impl DecorrelateWhereIn {
subquery,
negated,
} => {
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
let subquery = Arc::new(subquery);
let subquery = self
.try_optimize(&subquery.subquery, optimizer_config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery =
SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated);
Expand All @@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let filter_input = filter.input().as_ref();

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
let optimized_input = self
.try_optimize(filter_input, optimizer_config)?
.unwrap_or_else(|| filter_input.clone());

let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
Expand All @@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(optimized_plan);
return Ok(Some(optimized_plan));
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
let mut cur_input = filter_input.clone();
for subquery in subqueries {
cur_input = optimize_where_in(
&subquery,
Expand All @@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn {
optimizer_config,
)?;
}
Ok(cur_input)
Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down
35 changes: 26 additions & 9 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let input = (**filter.input()).clone();
Expand All @@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
_ => {
return utils::optimize_children(self, plan, _optimizer_config);
return Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?));
}
}

Expand Down Expand Up @@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin {

// if there are no join keys then do nothing.
if all_join_keys.is_empty() {
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(left),
)?))
)?)))
} else {
// remove join expressions from filter
match remove_join_expressions(predicate, &all_join_keys)? {
Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(left),
)?)),
_ => Ok(left),
Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
Filter::try_new(filter_expr, Arc::new(left))?,
))),
_ => Ok(Some(left)),
}
}
}

_ => utils::optimize_children(self, plan, _optimizer_config),
_ => Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?)),
}
}

Expand Down
22 changes: 18 additions & 4 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
Expand All @@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter {
};

match predicate_and_input {
Some((true, input)) => self.optimize(input, _optimizer_config),
Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
Some((true, input)) => self.try_optimize(input, _optimizer_config),
Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
})),
}))),
None => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, _optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?))
}
}
}
Expand Down
28 changes: 23 additions & 5 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,45 @@ impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
}));
})));
}
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return utils::optimize_children(self, input, optimizer_config);
return Ok(Some(utils::optimize_children(
self,
input,
_optimizer_config,
)?));
}
}
}
}
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?))
}

fn name(&self) -> &str {
Expand Down
Loading

0 comments on commit 44b2580

Please sign in to comment.