Skip to content

Commit

Permalink
feat: propagate EmptyRelation for more join types (apache#10963)
Browse files Browse the repository at this point in the history
* feat: propagate empty for more join types

* feat: update subquery de-correlation test

* tests: simplify tests

* refactor: better name

* style: clippy

* refactor: update tests

* refactor: rename

* refactor: fix spellings

* add slt tests
  • Loading branch information
tshauck authored and xinlifoobar committed Jun 22, 2024
1 parent 0c3b068 commit bde54bd
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 38 deletions.
8 changes: 4 additions & 4 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! This file contains an end to end test of extracting statitics from parquet files.
//! This file contains an end to end test of extracting statistics from parquet files.
//! It writes data into a parquet file, reads statistics and verifies they are correct

use std::default::Default;
Expand Down Expand Up @@ -716,8 +716,8 @@ async fn test_timestamp() {
// "seconds_timezoned" --> TimestampSecondArray
// "names" --> StringArray
//
// The file is created by 4 record batches, each has 5 rowws.
// Since the row group isze is set to 5, those 4 batches will go into 4 row groups
// The file is created by 4 record batches, each has 5 rows.
// Since the row group size is set to 5, those 4 batches will go into 4 row groups
// This creates a parquet files of 4 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned"
let reader = TestReader {
scenario: Scenario::Timestamps,
Expand Down Expand Up @@ -2039,7 +2039,7 @@ async fn test_missing_statistics() {
expected_min: Arc::new(Int64Array::from(vec![None])),
expected_max: Arc::new(Int64Array::from(vec![None])),
expected_null_counts: UInt64Array::from(vec![None]),
expected_row_counts: Some(UInt64Array::from(vec![3])), // stil has row count statistics
expected_row_counts: Some(UInt64Array::from(vec![3])), // still has row count statistics
column_name: "i64",
check: Check::RowGroup,
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/eliminate_one_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ mod tests {
}

fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_with_rules(
assert_optimized_plan_with_rules(
vec![Arc::new(EliminateOneUnion::new())],
plan,
expected,
true,
)
}

Expand Down
177 changes: 154 additions & 23 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;

use datafusion_common::tree_node::Transformed;
use datafusion_common::JoinType::Inner;
use datafusion_common::JoinType;
use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::LogicalPlan;
Expand Down Expand Up @@ -94,29 +94,62 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
}

LogicalPlan::Join(ref join) if join.join_type == Inner => {
LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
// For LeftSemi Join, if the right side is empty, the Join result is empty.
// For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
// For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty.
// For RightSemi Join, if the left side is empty, the Join result is empty.
// For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
// For Full Join, only both sides are empty, the Join result is empty.
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {

match join.join_type {
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
},
)));
}),
)),
JoinType::Left if left_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::Right if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::LeftSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
})),
),
JoinType::RightSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
})),
),
JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
JoinType::RightAnti if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: join.schema.clone(),
}),
)),
_ => Ok(Transformed::no(LogicalPlan::Join(join.clone()))),
}
Ok(Transformed::no(LogicalPlan::Join(join.clone())))
}
LogicalPlan::Aggregate(ref agg) => {
if !agg.group_expr.is_empty() {
Expand Down Expand Up @@ -222,7 +255,7 @@ mod tests {
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_nested_union::EliminateNestedUnion;
use crate::test::{
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
assert_optimized_plan_eq, assert_optimized_plan_with_rules, test_table_scan,
test_table_scan_fields, test_table_scan_with_name,
};

Expand All @@ -232,18 +265,20 @@ mod tests {
assert_optimized_plan_eq(Arc::new(PropagateEmptyRelation::new()), plan, expected)
}

fn assert_together_optimized_plan_eq(
fn assert_together_optimized_plan(
plan: LogicalPlan,
expected: &str,
eq: bool,
) -> Result<()> {
assert_optimized_plan_eq_with_rules(
assert_optimized_plan_with_rules(
vec![
Arc::new(EliminateFilter::new()),
Arc::new(EliminateNestedUnion::new()),
Arc::new(PropagateEmptyRelation::new()),
],
plan,
expected,
eq,
)
}

Expand Down Expand Up @@ -279,7 +314,7 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -292,7 +327,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;

let expected = "TableScan: test";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -317,7 +352,7 @@ mod tests {
let expected = "Union\
\n TableScan: test1\
\n TableScan: test4";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -342,7 +377,7 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -369,7 +404,7 @@ mod tests {
let expected = "Union\
\n TableScan: test2\
\n TableScan: test3";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -382,7 +417,7 @@ mod tests {
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;

let expected = "TableScan: test";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

#[test]
Expand All @@ -397,7 +432,103 @@ mod tests {
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}

fn assert_empty_left_empty_right_lp(
left_empty: bool,
right_empty: bool,
join_type: JoinType,
eq: bool,
) -> Result<()> {
let left_lp = if left_empty {
let left_table_scan = test_table_scan()?;

LogicalPlanBuilder::from(left_table_scan)
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
.build()
} else {
let scan = test_table_scan_with_name("left").unwrap();
LogicalPlanBuilder::from(scan).build()
}?;

let right_lp = if right_empty {
let right_table_scan = test_table_scan_with_name("right")?;

LogicalPlanBuilder::from(right_table_scan)
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
.build()
} else {
let scan = test_table_scan_with_name("right").unwrap();
LogicalPlanBuilder::from(scan).build()
}?;

let plan = LogicalPlanBuilder::from(left_lp)
.join_using(
right_lp,
join_type,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "EmptyRelation";
assert_together_optimized_plan(plan, expected, eq)
}

#[test]
fn test_join_empty_propagation_rules() -> Result<()> {
// test left join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;

// test right join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;

// test left semi join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;

// test left semi join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;

// test right semi join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;

// test right semi join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;

// test left anti join empty left
assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;

// test right anti join empty right
assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)
}

#[test]
fn test_join_empty_propagation_rules_noop() -> Result<()> {
// these cases should not result in an empty relation

// test left join with empty right
assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;

// test right join with empty left
assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;

// test left semi with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;

// test right semi with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;

// test left anti join with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;

// test left anti with non-empty left and empty right
assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;

// test right anti join with non-empty left and right
assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;

// test right anti with empty left and non-empty right
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
}

#[test]
Expand Down Expand Up @@ -430,6 +561,6 @@ mod tests {
let expected = "Projection: a, b, c\
\n TableScan: test";

assert_together_optimized_plan_eq(plan, expected)
assert_together_optimized_plan(plan, expected, true)
}
}
39 changes: 33 additions & 6 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ pub fn assert_analyzed_plan_eq(

Ok(())
}

pub fn assert_analyzed_plan_ne(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan:?}");
assert_ne!(formatted_plan, expected);

Ok(())
}

pub fn assert_analyzed_plan_eq_display_indent(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
Expand Down Expand Up @@ -169,21 +184,33 @@ pub fn assert_optimized_plan_eq(
Ok(())
}

pub fn assert_optimized_plan_eq_with_rules(
fn generate_optimized_plan_with_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
) -> LogicalPlan {
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
let config = &mut OptimizerContext::new()
.with_max_passes(1)
.with_skip_failing_rules(false);
let optimizer = Optimizer::with_rules(rules);
let optimized_plan = optimizer
optimizer
.optimize(plan, config, observe)
.expect("failed to optimize plan");
.expect("failed to optimize plan")
}

pub fn assert_optimized_plan_with_rules(
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
plan: LogicalPlan,
expected: &str,
eq: bool,
) -> Result<()> {
let optimized_plan = generate_optimized_plan_with_rules(rules, plan);
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);
if eq {
assert_eq!(formatted_plan, expected);
} else {
assert_ne!(formatted_plan, expected);
}
Ok(())
}

Expand Down
Loading

0 comments on commit bde54bd

Please sign in to comment.