-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange #4586
Conversation
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee | ||
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee | ||
pub trait DataExchangeExecutionPlan: ExecutionPlan {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
5c9043e
to
dc6f95a
Compare
If with this PR merged into the DataFusion, the Ballista side will be much more clear. One example is as follows: Hi @andygrove, @mingmwang, @alamb, could you help review this PR? By the way, for the last commit for UT work around, if we upgrade the arrow-rs to 29.0.0, the issue can be avoided and the last commit in this PR can be reverted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @yahoNanJing and @mingmwang these changes look good to me.
It seems like this PR contains both need_data_exchange
as well as some improvements to the enforcement pass to run more sorts in parallel -- perhaps the PR title could be modified to include the optimizer improvements as well
By the way, for the last commit for UT work around, if we upgrade the arrow-rs to 29.0.0, the issue can be avoided and the last commit in this PR can be reverted.
I merged the arrow 29 upgrade #4587 so if you pick up the latest master changes the tests should now pass
@@ -835,13 +836,42 @@ fn new_join_conditions( | |||
new_join_on | |||
} | |||
|
|||
/// Within this function, it checks whether we need to add additional plan operators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for these comments
.and_then(|sort_exec| { | ||
// If it's already preserving the partitioning, it can be regarded as a local sort | ||
// and there's no need for this optimization | ||
if !sort_exec.preserve_partitioning() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should also check if the SortExec
s input has has more than one partition -- otherwise the SortPreservingMerge will be a noop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I think this check is still required to avoid unnecessary shuffle/data exchange.
The reason is that the SortExec's input might be another CoalescePartitionsExec
, it is unnecessary to change SortExec
to SortPreservingMerge
+ parallel SortExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Although the current implementation will not introduce additional shuffling, since SortPreservingMerge will also check the input partition count. I'll add the check
@@ -243,6 +243,34 @@ pub trait ExecutionPlan: Debug + Send + Sync { | |||
fn statistics(&self) -> Statistics; | |||
} | |||
|
|||
/// Indicate whether a data exchange is needed, which will be very helpful |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Indicate whether a data exchange is needed, which will be very helpful | |
/// Indicate whether a data exchange is needed at the input of `plan`, which will be very helpful |
/// 1. RepartitionExec for changing the partition number between two operators | ||
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee | ||
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee | ||
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could generalize this code so that it compared plan.child().output_partitioning()
and plan.required_input_distribution()
Though it was not trivial when I was thinking about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether we can decide whether a data exchange is needed only depends on the plan.child().output_partitioning()
and plan.required_input_distribution()
. For example, how can we make a decision for Join which has two children. The enforcement rule will decide and add necessary physical operators like, RepartitionExec
, CoalescePartitionsExec
, SortPreservingMergeExec
. This should not be part of this function.
} else { | ||
Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?) | ||
}) | ||
Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the rationale here that the enforcement phase now creates the pattern with sort and merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's better to avoid manually adding physical operators. It's the enforcement job.
datafusion/core/tests/sql/joins.rs
Outdated
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", | ||
" SortExec: [t1_id@0 ASC NULLS LAST]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool plan -- it is the first time I know of that DataFusion crates plans using SortPreservingMerge
directly 👍
The rationale for this, as I understand it, is to support sorting in parallel. Internally the Sort operator does use a Merge if it has multiple input partitions. I wonder if we have any opinions about making the parallelism explicit in the plan (like this) or implict (within the operator)?
I think explicit in the plan is consistent with other parts of DataFusion (e.g. CoalscePartionExec
)
fyi @tustvold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a previous PR for TopK-queries I added the check on a LIMIT
to be present on purpose,
The reason is that SortPreservingMergeExec
+ parallel SortExec
is slower than CoalescePartitionsExec
+ SortExec
in my tests, so actually making queries run quite a bit slower. I think we have to use other means to parallelize sorts, such as range partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why SortPreservingMergeExec
+ parallel SortExec
is slower than CoalescePartitionsExec
+ SortExec
? Maybe it depends on the input data volumes need to be sorted.
Do you have some test data and SQL ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a reproducable example ready unfortunately, the way I tested some queries was running some SELECT x, y from test order by z
type of queries on TCP-h order lines table (using 16 partitions).
This was before the #4301 got merged though, so maybe things are different now 🤔
What about adding some similar queries to the parquet SQL benchmarks to confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the core idea of divide and conquer for parallel sorting and merging is very general. Also curious why the performance is worse than the single node global sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's more, the reason of depending on the limit to decide whether to change to parallel sort is also not strong. Sometimes, the limit number is very large and nothing can be pruned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both excellent points.
-
Limit looked like a relatively good heuristic, as one often provides a smaller value than the size of the input like 10, 100, 1000 or 10K but not 10M, 100M etc. One could fine tune the heuristic by only doing it on a "sufficiently" small limit. The limit causes the number of rows as input to
SortPreservingMergeExec
to be onlypartitions * limit
(beside also speeding up the parallel sort). -
I am not sure whether we should expect
SortPreservingMergeExec
+ parallelSortExec
to be (much) faster thanCoalescePartitionsExec
+SortExec
.SortPreservingMergeExec
itself is running on one partition/thread, is a relative heavy operation and also needs to wait on the sorted input partitions to be all ready.
Maybe with some fine-tuning (or maybe already after #4301) it is possible it is faster than a single-threaded SortExec
in certain cases, but other approaches like using range partitioning will be better for query parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to put back a condition for limit
to be present to parallelize sorts with SortPreservingMergeExec
for now to recover the previous behavior.
…sical operator needs data exchange
This reverts commit fc136f6.
70482e3
to
afafc53
Compare
Just rebased the latest master branch. |
Is this PR ready for another round of review? |
Hi @alamb, yes. This PR is ready for another round of review except for the concern of the removal for the limit check. |
Maybe I can firstly add back the limit check. |
12498c1
to
047b69c
Compare
047b69c
to
2313008
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- I also think @Dandandan 's comments have been addressed.
Thank you @yahoNanJing and @mingmwang
// - There's no limit pushed down to the local sort (It's still controversial) | ||
if sort_exec.input().output_partitioning().partition_count() > 1 | ||
&& !sort_exec.preserve_partitioning() | ||
&& sort_exec.fetch().is_some() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @Dandandan the check for local limit has been restored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// There are three situations that there's no need for this optimization | ||
// - There's only one input partition; | ||
// - It's already preserving the partitioning so that it can be regarded as a local sort | ||
// - There's no limit pushed down to the local sort (It's still controversial) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth a link to the ticket / PR for anyone who sees this comment and wants more context / backstore
Changes addressed. Please re-review
Nice! BTW I am not reagainst the change to use |
Benchmark runs are scheduled for baseline = ca8985e and contender = 920f11a. 920f11a is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4585.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?