diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 3110061c4f2c..3da9d24773a7 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -31,6 +31,7 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort::SortOptions; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; @@ -835,6 +836,9 @@ fn new_join_conditions( new_join_on } +/// Within this function, it checks whether we need to add additional plan operators +/// of data exchanging and data ordering to satisfy the required distribution and ordering. +/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places fn ensure_distribution_and_ordering( plan: Arc, target_partitions: usize, @@ -842,6 +846,37 @@ fn ensure_distribution_and_ordering( if plan.children().is_empty() { return Ok(plan); } + // It's mainly for changing the single node global SortExec to + // the SortPreservingMergeExec with multiple local SortExec. + // What's more, if limit exists, it can also be pushed down to the local sort + let plan = plan + .as_any() + .downcast_ref::() + .and_then(|sort_exec| { + // There are three situations that there's no need for this optimization + // - There's only one input partition; + // - It's already preserving the partitioning so that it can be regarded as a local sort + // - There's no limit pushed down to the local sort (It's still controversial) + if sort_exec.input().output_partitioning().partition_count() > 1 + && !sort_exec.preserve_partitioning() + && sort_exec.fetch().is_some() + { + let sort = SortExec::new_with_partitioning( + sort_exec.expr().to_vec(), + sort_exec.input().clone(), + true, + sort_exec.fetch(), + ); + Some(Arc::new(SortPreservingMergeExec::new( + sort_exec.expr().to_vec(), + Arc::new(sort), + ))) + } else { + None + } + }) + .map_or(plan, |new_plan| new_plan); + let required_input_distributions = plan.required_input_distribution(); let required_input_orderings = plan.required_input_ordering(); let children: Vec> = plan.children(); @@ -874,7 +909,7 @@ fn ensure_distribution_and_ordering( } }); - // Add SortExec to guarantee output ordering + // Add local SortExec to guarantee output ordering within each partition let new_children: Result>> = children .zip(required_input_orderings.into_iter()) .map(|(child_result, required)| { @@ -885,14 +920,9 @@ fn ensure_distribution_and_ordering( Ok(child) } else { let sort_expr = required.unwrap().to_vec(); - if child.output_partitioning().partition_count() > 1 { - Ok(Arc::new(SortExec::new_with_partitioning( - sort_expr, child, true, None, - )) as Arc) - } else { - Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?) - as Arc) - } + Ok(Arc::new(SortExec::new_with_partitioning( + sort_expr, child, true, None, + )) as Arc) } }) .collect(); diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 24f5b2daa655..5a247e16761f 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -110,8 +110,8 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// Specifies the output partitioning scheme of this plan fn output_partitioning(&self) -> Partitioning; - /// If the output of this operator is sorted, returns `Some(keys)` - /// with the description of how it was sorted. + /// If the output of this operator within each partition is sorted, + /// returns `Some(keys)` with the description of how it was sorted. /// /// For example, Sort, (obviously) produces sorted output as does /// SortPreservingMergeStream. Less obviously `Projection` @@ -128,8 +128,8 @@ pub trait ExecutionPlan: Debug + Send + Sync { vec![Distribution::UnspecifiedDistribution; self.children().len()] } - /// Specifies the ordering requirements for all the - /// children for this operator. + /// Specifies the ordering requirements for all of the children + /// For each child, it's the local ordering requirement within each partition rather than the global ordering fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } @@ -243,6 +243,34 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn statistics(&self) -> Statistics; } +/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful +/// especially for the distributed engine to judge whether need to deal with shuffling. +/// Currently there are 3 kinds of execution plan which needs data exchange +/// 1. RepartitionExec for changing the partition number between two operators +/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee +/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee +pub fn need_data_exchange(plan: Arc) -> bool { + if let Some(repart) = plan.as_any().downcast_ref::() { + !matches!( + repart.output_partitioning(), + Partitioning::RoundRobinBatch(_) + ) + } else if let Some(coalesce) = plan.as_any().downcast_ref::() + { + coalesce.input().output_partitioning().partition_count() > 1 + } else if let Some(sort_preserving_merge) = + plan.as_any().downcast_ref::() + { + sort_preserving_merge + .input() + .output_partitioning() + .partition_count() + > 1 + } else { + false + } +} + /// Returns a copy of this plan if we change any child according to the pointer comparison. /// The size of `children` must be equal to the size of `ExecutionPlan::children()`. /// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even @@ -655,6 +683,8 @@ pub mod values; pub mod windows; use crate::execution::context::TaskContext; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; pub use datafusion_physical_expr::{ expressions, functions, hash_utils, type_coercion, udf, }; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index d0d74c54f101..5ba8cabe9e52 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -18,7 +18,6 @@ //! Physical query planner use super::analyze::AnalyzeExec; -use super::sorts::sort_preserving_merge::SortPreservingMergeExec; use super::{ aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, @@ -838,22 +837,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK) - Ok(if fetch.is_some() && session_state.config.target_partitions() > 1 { - let sort = SortExec::new_with_partitioning( - sort_expr, - physical_input, - true, - *fetch, - ); - let merge = SortPreservingMergeExec::new( - sort.expr().to_vec(), - Arc::new(sort), - ); - Arc::new(merge) - } else { - Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?) - }) + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 89112adae74a..b6bbb3cff16a 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -92,10 +92,11 @@ async fn explain_analyze_baseline_metrics() { "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); + // The number of output rows becomes less after changing the global sort to the local sort with limit push down assert_metrics!( &formatted, "CoalescePartitionsExec", - "metrics=[output_rows=5, elapsed_compute=" + "metrics=[output_rows=3, elapsed_compute=" ); assert_metrics!( &formatted,