Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange #4586

Merged
merged 7 commits into from
Dec 16, 2022
48 changes: 39 additions & 9 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
Expand Down Expand Up @@ -835,13 +836,47 @@ fn new_join_conditions(
new_join_on
}

/// Within this function, it checks whether we need to add additional plan operators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these comments

/// of data exchanging and data ordering to satisfy the required distribution and ordering.
/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
fn ensure_distribution_and_ordering(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
return Ok(plan);
}
// It's mainly for changing the single node global SortExec to
// the SortPreservingMergeExec with multiple local SortExec.
// What's more, if limit exists, it can also be pushed down to the local sort
let plan = plan
alamb marked this conversation as resolved.
Show resolved Hide resolved
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
// There are three situations that there's no need for this optimization
// - There's only one input partition;
// - It's already preserving the partitioning so that it can be regarded as a local sort
// - There's no limit pushed down to the local sort (It's still controversial)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth a link to the ticket / PR for anyone who sees this comment and wants more context / backstore

if sort_exec.input().output_partitioning().partition_count() > 1
&& !sort_exec.preserve_partitioning()
&& sort_exec.fetch().is_some()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @Dandandan the check for local limit has been restored

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
Some(Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
)))
} else {
None
}
})
.map_or(plan, |new_plan| new_plan);

let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
Expand Down Expand Up @@ -874,7 +909,7 @@ fn ensure_distribution_and_ordering(
}
});

// Add SortExec to guarantee output ordering
// Add local SortExec to guarantee output ordering within each partition
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.zip(required_input_orderings.into_iter())
.map(|(child_result, required)| {
Expand All @@ -885,14 +920,9 @@ fn ensure_distribution_and_ordering(
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
if child.output_partitioning().partition_count() > 1 {
Ok(Arc::new(SortExec::new_with_partitioning(
sort_expr, child, true, None,
)) as Arc<dyn ExecutionPlan>)
} else {
Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?)
as Arc<dyn ExecutionPlan>)
}
Ok(Arc::new(SortExec::new_with_partitioning(
alamb marked this conversation as resolved.
Show resolved Hide resolved
sort_expr, child, true, None,
)) as Arc<dyn ExecutionPlan>)
}
})
.collect();
Expand Down
38 changes: 34 additions & 4 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;

/// If the output of this operator is sorted, returns `Some(keys)`
/// with the description of how it was sorted.
/// If the output of this operator within each partition is sorted,
/// returns `Some(keys)` with the description of how it was sorted.
///
/// For example, Sort, (obviously) produces sorted output as does
/// SortPreservingMergeStream. Less obviously `Projection`
Expand All @@ -128,8 +128,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}

/// Specifies the ordering requirements for all the
/// children for this operator.
/// Specifies the ordering requirements for all of the children
/// For each child, it's the local ordering requirement within each partition rather than the global ordering
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![None; self.children().len()]
}
Expand Down Expand Up @@ -243,6 +243,34 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn statistics(&self) -> Statistics;
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
/// especially for the distributed engine to judge whether need to deal with shuffling.
/// Currently there are 3 kinds of execution plan which needs data exchange
/// 1. RepartitionExec for changing the partition number between two operators
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could generalize this code so that it compared plan.child().output_partitioning() and plan.required_input_distribution()

Though it was not trivial when I was thinking about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether we can decide whether a data exchange is needed only depends on the plan.child().output_partitioning() and plan.required_input_distribution(). For example, how can we make a decision for Join which has two children. The enforcement rule will decide and add necessary physical operators like, RepartitionExec, CoalescePartitionsExec, SortPreservingMergeExec. This should not be part of this function.

if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
!matches!(
repart.output_partitioning(),
Partitioning::RoundRobinBatch(_)
)
} else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
{
coalesce.input().output_partitioning().partition_count() > 1
} else if let Some(sort_preserving_merge) =
plan.as_any().downcast_ref::<SortPreservingMergeExec>()
{
sort_preserving_merge
.input()
.output_partitioning()
.partition_count()
> 1
} else {
false
}
}

/// Returns a copy of this plan if we change any child according to the pointer comparison.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
/// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even
Expand Down Expand Up @@ -655,6 +683,8 @@ pub mod values;
pub mod windows;

use crate::execution::context::TaskContext;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
Expand Down
18 changes: 1 addition & 17 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Physical query planner

use super::analyze::AnalyzeExec;
use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
Expand Down Expand Up @@ -838,22 +837,7 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
// If we have a `LIMIT` can run sort/limts in parallel (similar to TopK)
Ok(if fetch.is_some() && session_state.config.target_partitions() > 1 {
let sort = SortExec::new_with_partitioning(
sort_expr,
physical_input,
true,
*fetch,
);
let merge = SortPreservingMergeExec::new(
sort.expr().to_vec(),
Arc::new(sort),
);
Arc::new(merge)
} else {
Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)
})
Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the rationale here that the enforcement phase now creates the pattern with sort and merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's better to avoid manually adding physical operators. It's the enforcement job.

}
LogicalPlan::Join(Join {
left,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ async fn explain_analyze_baseline_metrics() {
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
// The number of output rows becomes less after changing the global sort to the local sort with limit push down
assert_metrics!(
&formatted,
"CoalescePartitionsExec",
"metrics=[output_rows=5, elapsed_compute="
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
Expand Down