Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add try_optimize() for all rules. #4599

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ impl CommonSubexprEliminate {
})
.collect::<Result<Vec<_>>>()?;

let mut new_input = self.optimize(input, optimizer_config)?;
let mut new_input = self
.try_optimize(input, optimizer_config)?
.unwrap_or_else(|| input.clone());
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
}
Expand All @@ -94,6 +96,16 @@ impl OptimizerRule for CommonSubexprEliminate {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut expr_set = ExprSet::new();

match plan {
Expand All @@ -113,11 +125,13 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
)?))
Ok(Some(LogicalPlan::Projection(
Projection::try_new_with_schema(
pop_expr(&mut new_expr)?,
Arc::new(new_input),
schema.clone(),
)?,
)))
}
LogicalPlan::Filter(filter) => {
let input = filter.input();
Expand All @@ -140,10 +154,10 @@ impl OptimizerRule for CommonSubexprEliminate {
)?;

if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_input),
)?))
)?)))
} else {
Err(DataFusionError::Internal(
"Failed to pop predicate expr".to_string(),
Expand All @@ -166,11 +180,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Window(Window {
Ok(Some(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
}))
})))
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
Expand All @@ -194,12 +208,14 @@ impl OptimizerRule for CommonSubexprEliminate {
let new_aggr_expr = pop_expr(&mut new_expr)?;
let new_group_expr = pop_expr(&mut new_expr)?;

Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
)?))
Ok(Some(LogicalPlan::Aggregate(
Aggregate::try_new_with_schema(
Arc::new(new_input),
new_group_expr,
new_aggr_expr,
schema.clone(),
)?,
)))
}
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
let input_schema = Arc::clone(input.schema());
Expand All @@ -213,11 +229,11 @@ impl OptimizerRule for CommonSubexprEliminate {
optimizer_config,
)?;

Ok(LogicalPlan::Sort(Sort {
Ok(Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
input: Arc::new(new_input),
fetch: *fetch,
}))
})))
}
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
Expand All @@ -243,7 +259,11 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ impl DecorrelateWhereExists {
for it in filters.iter() {
match it {
Expr::Exists { subquery, negated } => {
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
let subquery = Arc::new(subquery);
let subquery = self
.try_optimize(&subquery.subquery, optimizer_config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery = SubqueryInfo::new(subquery.clone(), *negated);
subqueries.push(subquery);
Expand Down Expand Up @@ -90,10 +92,12 @@ impl OptimizerRule for DecorrelateWhereExists {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let filter_input = filter.input().as_ref();

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
let optimized_input = self
.try_optimize(filter_input, optimizer_config)?
.unwrap_or_else(|| filter_input.clone());

let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
Expand All @@ -107,7 +111,7 @@ impl OptimizerRule for DecorrelateWhereExists {
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
let mut cur_input = filter_input.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

for subquery in subqueries {
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
{
Expand Down
34 changes: 26 additions & 8 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ impl DecorrelateWhereIn {
subquery,
negated,
} => {
let subquery = self.optimize(&subquery.subquery, optimizer_config)?;
let subquery = Arc::new(subquery);
let subquery = self
.try_optimize(&subquery.subquery, optimizer_config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
let subquery =
SubqueryInfo::new(subquery.clone(), (**expr).clone(), *negated);
Expand All @@ -81,13 +83,25 @@ impl OptimizerRule for DecorrelateWhereIn {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
let filter_input = filter.input();
let filter_input = filter.input().as_ref();

// Apply optimizer rule to current input
let optimized_input = self.optimize(filter_input, optimizer_config)?;
let optimized_input = self
.try_optimize(filter_input, optimizer_config)?
.unwrap_or_else(|| filter_input.clone());

let (subqueries, other_exprs) =
self.extract_subquery_exprs(predicate, optimizer_config)?;
Expand All @@ -97,11 +111,11 @@ impl OptimizerRule for DecorrelateWhereIn {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(optimized_plan);
return Ok(Some(optimized_plan));
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
let mut cur_input = filter_input.clone();
for subquery in subqueries {
cur_input = optimize_where_in(
&subquery,
Expand All @@ -110,11 +124,15 @@ impl OptimizerRule for DecorrelateWhereIn {
optimizer_config,
)?;
}
Ok(cur_input)
Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down
35 changes: 26 additions & 9 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ impl OptimizerRule for EliminateCrossJoin {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let input = (**filter.input()).clone();
Expand All @@ -78,7 +88,11 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
_ => {
return utils::optimize_children(self, plan, _optimizer_config);
return Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?));
}
}

Expand Down Expand Up @@ -109,23 +123,26 @@ impl OptimizerRule for EliminateCrossJoin {

// if there are no join keys then do nothing.
if all_join_keys.is_empty() {
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(left),
)?))
)?)))
} else {
// remove join expressions from filter
match remove_join_expressions(predicate, &all_join_keys)? {
Some(filter_expr) => Ok(LogicalPlan::Filter(Filter::try_new(
filter_expr,
Arc::new(left),
)?)),
_ => Ok(left),
Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
Filter::try_new(filter_expr, Arc::new(left))?,
))),
_ => Ok(Some(left)),
}
}
}

_ => utils::optimize_children(self, plan, _optimizer_config),
_ => Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?)),
}
}

Expand Down
22 changes: 18 additions & 4 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ impl OptimizerRule for EliminateFilter {
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
Expand All @@ -53,14 +63,18 @@ impl OptimizerRule for EliminateFilter {
};

match predicate_and_input {
Some((true, input)) => self.optimize(input, _optimizer_config),
Some((false, input)) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
Some((true, input)) => self.try_optimize(input, _optimizer_config),
Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
})),
}))),
None => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, _optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
)?))
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,43 @@ impl OptimizerRule for EliminateLimit {
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Some(fetch) => {
if fetch == 0 {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
}));
})));
}
}
None => {
if limit.skip == 0 {
let input = &*limit.input;
return utils::optimize_children(self, input, optimizer_config);
return Ok(Some(utils::optimize_children(
self,
input,
optimizer_config,
)?));
}
}
}
}
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}

fn name(&self) -> &str {
Expand Down
Loading