Skip to content

Commit

Permalink
refactor PipelineStatePropagator using `TreeNode.transform_up_with_…
Browse files Browse the repository at this point in the history
…payload()`
  • Loading branch information
peter-toth committed Jan 11, 2024
1 parent 24b4f00 commit ecf18e2
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 126 deletions.
103 changes: 43 additions & 60 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::sync::Arc;

use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use crate::physical_plan::joins::{
Expand Down Expand Up @@ -228,7 +227,6 @@ impl PhysicalOptimizerRule for JoinSelection {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
// First, we make pipeline-fixing modifications to joins so as to accommodate
// unbounded inputs. Each pipeline-fixing subrule, which is a function
// of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
Expand All @@ -238,7 +236,10 @@ impl PhysicalOptimizerRule for JoinSelection {
Box::new(hash_join_convert_symmetric_subrule),
Box::new(hash_join_swap_subrule),
];
let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules, config))?;
let (plan, _) =
plan.transform_up_with_payload(&mut |p, children_unbounded| {
apply_subrules(p, children_unbounded, &subrules, config)
})?;
// Next, we apply another subrule that tries to optimize joins using any
// statistics their inputs might have.
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
Expand All @@ -252,7 +253,7 @@ impl PhysicalOptimizerRule for JoinSelection {
// side is the small side.
let config = &config.optimizer;
let collect_left_threshold = config.hash_join_single_partition_threshold;
state.plan.transform_up(&|plan| {
plan.transform_up(&|plan| {
statistical_join_selection_subrule(plan, collect_left_threshold)
})
}
Expand Down Expand Up @@ -422,9 +423,10 @@ fn statistical_join_selection_subrule(

/// Pipeline-fixing join selection subrule.
pub type PipelineFixerSubrule = dyn Fn(
PipelineStatePropagator,
Arc<dyn ExecutionPlan>,
&[bool],
&ConfigOptions,
) -> Option<Result<PipelineStatePropagator>>;
) -> Option<Result<Arc<dyn ExecutionPlan>>>;

/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
///
Expand All @@ -442,16 +444,15 @@ pub type PipelineFixerSubrule = dyn Fn(
/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
/// or `Some(Err(...))` if an error occurs during the transformation.
fn hash_join_convert_symmetric_subrule(
mut input: PipelineStatePropagator,
plan: Arc<dyn ExecutionPlan>,
children_unbounded: &[bool],
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
) -> Option<Result<Arc<dyn ExecutionPlan>>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
// Determine if left and right children are unbounded.
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
// Update the unbounded flag of the input.
input.unbounded = left_unbounded || right_unbounded;
let (left_unbounded, right_unbounded) =
(children_unbounded[0], children_unbounded[1]);
// Process only if both left and right sides are unbounded.
let result = if left_unbounded && right_unbounded {
// Determine the partition mode based on configuration.
Expand Down Expand Up @@ -528,12 +529,9 @@ fn hash_join_convert_symmetric_subrule(
right_order,
mode,
)
.map(|exec| {
input.plan = Arc::new(exec) as _;
input
})
.map(|exec| Arc::new(exec) as _)
} else {
Ok(input)
Ok(plan)
};
Some(result)
} else {
Expand Down Expand Up @@ -583,13 +581,13 @@ fn hash_join_convert_symmetric_subrule(
///
/// ```
fn hash_join_swap_subrule(
mut input: PipelineStatePropagator,
plan: Arc<dyn ExecutionPlan>,
children_unbounded: &[bool],
_config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
) -> Option<Result<Arc<dyn ExecutionPlan>>> {
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
let (left_unbounded, right_unbounded) =
(children_unbounded[0], children_unbounded[1]);
let result = if left_unbounded
&& !right_unbounded
&& matches!(
Expand All @@ -599,12 +597,9 @@ fn hash_join_swap_subrule(
| JoinType::LeftSemi
| JoinType::LeftAnti
) {
swap_join_according_to_unboundedness(hash_join).map(|plan| {
input.plan = plan;
input
})
swap_join_according_to_unboundedness(hash_join)
} else {
Ok(input)
Ok(plan)
};
Some(result)
} else {
Expand Down Expand Up @@ -642,27 +637,29 @@ fn swap_join_according_to_unboundedness(
/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
/// auxiliary boundedness information, is in the `PipelineStatePropagator` object.
fn apply_subrules(
mut input: PipelineStatePropagator,
mut plan: Arc<dyn ExecutionPlan>,
children_unbounded: Vec<bool>,
subrules: &Vec<Box<PipelineFixerSubrule>>,
config_options: &ConfigOptions,
) -> Result<Transformed<PipelineStatePropagator>> {
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, bool)> {
for subrule in subrules {
if let Some(value) = subrule(input.clone(), config_options).transpose()? {
input = value;
if let Some(new_plan) =
subrule(plan.clone(), children_unbounded.as_slice(), config_options)
.transpose()?
{
plan = new_plan;
}
}
let is_unbounded = input
.plan
.unbounded_output(&input.children_unbounded())
let is_unbounded = plan
.unbounded_output(children_unbounded.as_slice())
// Treat the case where an operator can not run on unbounded data as
// if it can and it outputs unbounded data. Do not raise an error yet.
// Such operators may be fixed, adjusted or replaced later on during
// optimization passes -- sorts may be removed, windows may be adjusted
// etc. If this doesn't happen, the final `PipelineChecker` rule will
// catch this and raise an error anyway.
.unwrap_or(true);
input.unbounded = is_unbounded;
Ok(Transformed::Yes(input))
Ok((Transformed::Yes(plan), is_unbounded))
}

#[cfg(test)]
Expand Down Expand Up @@ -1329,7 +1326,6 @@ mod hash_join_tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::DataPtr;
use datafusion_common::JoinType;
use datafusion_physical_plan::empty::EmptyExec;
use std::sync::Arc;

struct TestCase {
Expand Down Expand Up @@ -1697,28 +1693,15 @@ mod hash_join_tests {
false,
)?;

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
];
let initial_hash_join_state = PipelineStatePropagator {
plan: Arc::new(join),
unbounded: false,
children,
};
let children_unbounded = vec![left_unbounded, right_unbounded];
let initial_hash_join_state = Arc::new(join);

let optimized_hash_join =
hash_join_swap_subrule(initial_hash_join_state, &ConfigOptions::new())
.unwrap()?;
let optimized_join_plan = optimized_hash_join.plan;
let (optimized_join_plan, _) = hash_join_swap_subrule(
initial_hash_join_state,
children_unbounded.as_slice(),
&ConfigOptions::new(),
)
.unwrap()?;

// If swap did happen
let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
Expand Down
78 changes: 12 additions & 66 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
//! infinite sources, if there are any. It will reject non-runnable query plans
//! that use pipeline-breaking operators on infinite input(s).

use std::borrow::Cow;
use std::sync::Arc;

use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use crate::physical_plan::ExecutionPlan;

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
Expand All @@ -51,10 +50,11 @@ impl PhysicalOptimizerRule for PipelineChecker {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let pipeline = PipelineStatePropagator::new(plan);
let state = pipeline
.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?;
Ok(state.plan)
let (plan, _) =
plan.transform_up_with_payload(&mut |p, children_unbounded| {
check_finiteness_requirements(p, children_unbounded, &config.optimizer)
})?;
Ok(plan)
}

fn name(&self) -> &str {
Expand All @@ -66,63 +66,14 @@ impl PhysicalOptimizerRule for PipelineChecker {
}
}

/// [PipelineStatePropagator] propagates the [ExecutionPlan] pipelining information.
#[derive(Clone, Debug)]
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
pub(crate) children: Vec<Self>,
}

impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let children = plan.children();
Self {
plan,
unbounded: false,
children: children.into_iter().map(Self::new).collect(),
}
}

/// Returns the children unboundedness information.
pub fn children_unbounded(&self) -> Vec<bool> {
self.children.iter().map(|c| c.unbounded).collect()
}
}

impl TreeNode for PipelineStatePropagator {
fn children_nodes(&self) -> Vec<Cow<Self>> {
self.children.iter().map(Cow::Borrowed).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
if !self.children.is_empty() {
self.children = self
.children
.into_iter()
.map(transform)
.collect::<Result<_>>()?;
self.plan = with_new_children_if_necessary(
self.plan,
self.children.iter().map(|c| c.plan.clone()).collect(),
)?
.into();
}
Ok(self)
}
}

/// This function propagates finiteness information and rejects any plan with
/// pipeline-breaking operators acting on infinite inputs.
pub fn check_finiteness_requirements(
mut input: PipelineStatePropagator,
plan: Arc<dyn ExecutionPlan>,
children_unbounded: Vec<bool>,
optimizer_options: &OptimizerOptions,
) -> Result<Transformed<PipelineStatePropagator>> {
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, bool)> {
if let Some(exec) = plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
{
Expand All @@ -131,13 +82,8 @@ pub fn check_finiteness_requirements(
return plan_err!("{}", MSG);
}
}
input
.plan
.unbounded_output(&input.children_unbounded())
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
})
plan.unbounded_output(children_unbounded.as_slice())
.map(|value| (Transformed::No(plan), value))
}

/// This function returns whether a given symmetric hash join is amenable to
Expand Down

0 comments on commit ecf18e2

Please sign in to comment.