Skip to content

Commit

Permalink
Add try_optimize method (#4208)
Browse files Browse the repository at this point in the history
* Add try_optimize method

* lint

* address feedback
  • Loading branch information
andygrove authored Nov 15, 2022
1 parent 4653df4 commit 28ca3ee
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 39 deletions.
76 changes: 46 additions & 30 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use crate::utils::{
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{context, plan_err, DataFusionError};
use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_common::{context, Result};
use datafusion_expr::{
logical_plan::{Filter, JoinType, Subquery},
Expr, LogicalPlan, LogicalPlanBuilder,
};
use std::sync::Arc;

/// Optimizer rule for rewriting subquery filters to joins
Expand All @@ -47,7 +49,7 @@ impl DecorrelateWhereExists {
&self,
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate);

let mut subqueries = vec![];
Expand All @@ -74,7 +76,17 @@ impl OptimizerRule for DecorrelateWhereExists {
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
Expand All @@ -91,19 +103,28 @@ impl OptimizerRule for DecorrelateWhereExists {
)?);
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(optimized_plan);
return Ok(Some(optimized_plan));
}

// iterate through all exists clauses in predicate, turning each into a join
let mut cur_input = (**filter_input).clone();
for subquery in subqueries {
cur_input = optimize_exists(&subquery, &cur_input, &other_exprs)?;
if let Some(x) = optimize_exists(&subquery, &cur_input, &other_exprs)?
{
cur_input = x;
} else {
return Ok(None);
}
}
Ok(cur_input)
Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, optimizer_config)
Ok(Some(utils::optimize_children(
self,
plan,
optimizer_config,
)?))
}
}
}
Expand Down Expand Up @@ -132,20 +153,22 @@ fn optimize_exists(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
outer_other_exprs: &[Expr],
) -> datafusion_common::Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
let subqry_filter = match query_info.query.subquery.as_ref() {
LogicalPlan::Distinct(subqry_distinct) => match subqry_distinct.input.as_ref() {
LogicalPlan::Projection(subqry_proj) => {
Filter::try_from_plan(&subqry_proj.input)
}
_ => Err(DataFusionError::NotImplemented(
"Subquery currently only supports distinct or projection".to_string(),
)),
_ => {
// Subquery currently only supports distinct or projection
return Ok(None);
}
},
LogicalPlan::Projection(subqry_proj) => Filter::try_from_plan(&subqry_proj.input),
_ => Err(DataFusionError::NotImplemented(
"Subquery currently only supports distinct or projection".to_string(),
)),
_ => {
// Subquery currently only supports distinct or projection
return Ok(None);
}
}
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;

Expand All @@ -159,7 +182,8 @@ fn optimize_exists(
let (outer_cols, subqry_cols, join_filters) =
exprs_to_join_cols(&col_exprs, subqry_filter.input().schema(), false)?;
if subqry_cols.is_empty() || outer_cols.is_empty() {
plan_err!("cannot optimize non-correlated subquery")?;
// cannot optimize non-correlated subquery
return Ok(None);
}

// build subquery side of join - the thing the subquery was querying
Expand Down Expand Up @@ -188,7 +212,7 @@ fn optimize_exists(
}

let result = new_plan.build()?;
Ok(result)
Ok(Some(result))
}

struct SubqueryInfo {
Expand Down Expand Up @@ -318,9 +342,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand All @@ -339,9 +361,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand All @@ -360,9 +380,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand Down Expand Up @@ -426,9 +444,7 @@ mod tests {
.project(vec![col("customer.c_custkey")])?
.build()?;

let expected = r#"cannot optimize non-correlated subquery"#;

assert_optimizer_err(&DecorrelateWhereExists::new(), &plan, expected);
assert_optimization_skipped(&DecorrelateWhereExists::new(), &plan);
Ok(())
}

Expand Down
21 changes: 18 additions & 3 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ use std::time::Instant;
/// way. If there are no suitable transformations for the input plan,
/// the optimizer can simply return it as is.
pub trait OptimizerRule {
/// Rewrite `plan` to an optimized form
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
self.optimize(plan, optimizer_config).map(Some)
}

/// Rewrite `plan` to an optimized form. This method will eventually be deprecated and
/// replace by `try_optimize`.
fn optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -209,13 +220,17 @@ impl Optimizer {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);

for rule in &self.rules {
let result = rule.optimize(&new_plan, optimizer_config);
let result = rule.try_optimize(&new_plan, optimizer_config);
match result {
Ok(plan) => {
Ok(Some(plan)) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Ok(None) => {
observer(&new_plan, rule.as_ref());
log_plan(rule.name(), &new_plan);
}
Err(ref e) => {
if optimizer_config.skip_failing_rules {
// Note to future readers: if you see this warning it signals a
Expand Down
8 changes: 8 additions & 0 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,11 @@ pub fn assert_optimizer_err(
}
}
}

pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: &LogicalPlan) {
let new_plan = rule.optimize(plan, &mut OptimizerConfig::new()).unwrap();
assert_eq!(
format!("{}", plan.display_indent()),
format!("{}", new_plan.display_indent())
);
}
11 changes: 5 additions & 6 deletions datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ pub fn optimize_children(
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let new_inputs = plan
.inputs()
.into_iter()
.map(|plan| optimizer.optimize(plan, optimizer_config))
.collect::<Result<Vec<_>>>()?;

let mut new_inputs = Vec::with_capacity(plan.inputs().len());
for input in plan.inputs() {
let new_input = optimizer.try_optimize(input, optimizer_config)?;
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
from_plan(plan, &new_exprs, &new_inputs)
}

Expand Down

0 comments on commit 28ca3ee

Please sign in to comment.