Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnecessary SortExec removal rule from Physical Plan #4691

Merged
merged 22 commits into from
Dec 26, 2022
Merged

Unnecessary SortExec removal rule from Physical Plan #4691

merged 22 commits into from
Dec 26, 2022

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Dec 21, 2022

Which issue does this PR close?

Closes #4686

Rationale for this change

After physical plan construction, we may end up with SortExecs that are unnecessary. This happens fairly often in realistic use cases, but let's start with a simple yet unrealistic example to illustrate the issue: Assume that we somehow end up with a physical plan fragment such as the one below:

"SortExec: [a@0 ASC]",
"  SortExec: [b@1 ASC]",

Since second SortExec overwrites the first one, the first SortExec is useless; we can safely remove it.

We propose a rule that analyzes the physical plan and removes unnecessary SortExecs (i.e. SortExecs followed by layers in the execution plan that don't require an input ordering and don't maintain their input ordering). Obviously, this rule would solve the toy problem above.

Let's discuss more realistic scenarios. Consider the following query from our tests:

SELECT count(*) as global_count FROM
                 (SELECT count(*), c1
                  FROM aggregate_test_100
                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
                  GROUP BY c1
                  ORDER BY c1 ) AS a 

Its physical plan is as follows:

    "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
    "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
    "    CoalescePartitionsExec",
    "      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
    "        RepartitionExec: partitioning=RoundRobinBatch(8)",
    "          SortExec: [c1@0 ASC NULLS LAST]",
    "            CoalescePartitionsExec",
    "              AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
    "                CoalesceBatchesExec: target_batch_size=4096",
    "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8)",
    "                    AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
    "                      CoalesceBatchesExec: target_batch_size=4096",
    "                        FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
    "                          RepartitionExec: partitioning=RoundRobinBatch(8)",

This plan contains a SortExec because of the ORDER BY clause in the subquery. However, the AggregateExec layer above the sort doesn't require any input ordering and doesn't maintain its input ordering either. Hence, the SortExec in this plan is unnecessary. This basic rule removes such unnecessary SortExecs from physical plans.

Furthermore, some executors enable further optimizations for removing SortExecs before them. As an example, consider WindowAggExec. Currently, we remove the SortExecs before WindowAggExecs if input sorting direction and required sorting direction are same. However, most of the window functions can also calculate their results with reverse order. Therefore, we can remove SortExecs if we slightly modify the WindowAggExec above it. Consider the following query as an example:

SELECT
    c9,
    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
    FROM aggregate_test_100
    LIMIT 5

This PR transforms the physical plan of the query above from

"ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
"  GlobalLimitExec: skip=0, fetch=5",
"    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
"      SortExec: [c9@1 ASC NULLS LAST]",
"        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
"          SortExec: [c9@0 DESC]",

into

"ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]",
"  GlobalLimitExec: skip=0, fetch=5",
"    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
"      WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]",
"        SortExec: [c9@0 DESC]",

What changes are included in this PR?

In order to utilize this optimization, window functions should implement a reverse_expr method. We have added support for SUM, COUNT for aggregate functions (adding support for other window functions should be similar to these). For built-in window functions we added support FIRST_VALUE, LAST_VALUE, LEAD, LAG. Theoretically NTH_VALUE can produce its result in reverse order also. However, support for NTH_VALUE is left as future work. AFAICT, other built-in window functions cannot produce their result in reverse order and thus not can not partake in this optimization.

Are these changes tested?

We added tests asserting new optimized physical plans for various cases. Approximately 1000 line of the changes comes from the additional tests.

Are there any user-facing changes?

None.

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions labels Dec 21, 2022
@ozankabak
Copy link
Contributor

I am quite excited about this! It removes a lot of unnecessary pipeline-breaking sorts by (1) analyzing whether they are really necessary, and (2) transforming window queries to obviate the need for sorting. This PR not only optimizes Datafusion for existing use cases, but also propels Datafusion closer to being a great foundation for streaming use cases. It is a significant progress in the streaming roadmap we previously published.

The PR looks big, but the meat of the change is mostly in a new file (the new rule). The rest of the changes are either quite small, or test-related.

@mustafasrepo and I will be happy to answer any questions and are looking forward to feedback!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very impressive @mustafasrepo -- thank you. I plan to review it carefully tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really nice @mustafasrepo 🏆 Thank you for this feature to removing unnecessary sorts 👍 this was an optimization we would also like in IOx but I had not had a chance to write it up.

The only thing that is needed prior to my approval is to double check the plan in window.rs that seems to have a new unnecessary sort. I may be mistake but we should resolve that discussion first.

I also suspect that the logic in datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs could be significantly simplified (I left my suggestion there about how) but given the test coverage I think we can also merge the implementation in this PR and refine it as a follow on if/when needed.

I had a few a bunch of other minor suggestions, none of them required in my opinion

I think @mingmwang / @liukun4515 / @yahoNanJing may also be interested in this PR and given its importance I plant to leave it open for a few days so they have a chance to comment if they wish

I also tried this out locally and it worked for my simple cases

❯ create table foo as values (1), (3), (2), (5);
0 rows in set. Query took 0.016 seconds.
❯ select * from foo;
+---------+
| column1 |
+---------+
| 1       |
| 3       |
| 2       |
| 5       |
+---------+

on the master branch (has two sorts)

❯ explain select * from (select * from foo order by column1) order by column1 desc;
+---------------+---------------------------------------------------------+
| plan_type     | plan                                                    |
+---------------+---------------------------------------------------------+
| logical_plan  | Sort: foo.column1 DESC NULLS FIRST                      |
|               |   Projection: foo.column1                               |
|               |     Sort: foo.column1 ASC NULLS LAST                    |
|               |       TableScan: foo projection=[column1]               |
| physical_plan | SortExec: [column1@0 DESC]                              |
|               |   CoalescePartitionsExec                                |
|               |     ProjectionExec: expr=[column1@0 as column1]         |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16) |
|               |         SortExec: [column1@0 ASC NULLS LAST]            |
|               |           MemoryExec: partitions=1, partition_sizes=[1] |
|               |                                                         |
+---------------+---------------------------------------------------------+

On this branch there is only one sort

❯ explain select * from (select * from foo order by column1) order by column1 desc;

+---------------+---------------------------------------------------------+
| plan_type     | plan                                                    |
+---------------+---------------------------------------------------------+
| logical_plan  | Sort: foo.column1 DESC NULLS FIRST                      |
|               |   Projection: foo.column1                               |
|               |     Sort: foo.column1 ASC NULLS LAST                    |
|               |       TableScan: foo projection=[column1]               |
| physical_plan | SortExec: [column1@0 DESC]                              |
|               |   CoalescePartitionsExec                                |
|               |     ProjectionExec: expr=[column1@0 as column1]         |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16) |
|               |         MemoryExec: partitions=1, partition_sizes=[1]   |
|               |                                                         |
+---------------+---------------------------------------------------------+

Note (we can add this as a follow on):

I also tried this query (with in theory does not need any sort as the outer query does not specify a sort.

Sadly DataFusion still puts the sort in (probably because there is no way to mark that the top Projection doesn't require its output to be sorted):

❯ explain select * from (select * from foo order by column1);
+---------------+---------------------------------------------------+
| plan_type     | plan                                              |
+---------------+---------------------------------------------------+
| logical_plan  | Projection: foo.column1                           |
|               |   Sort: foo.column1 ASC NULLS LAST                |
|               |     TableScan: foo projection=[column1]           |
| physical_plan | ProjectionExec: expr=[column1@0 as column1]       |
|               |   SortExec: [column1@0 ASC NULLS LAST]            |
|               |     MemoryExec: partitions=1, partition_sizes=[1] |
|               |                                                   |
+---------------+---------------------------------------------------+

datafusion/common/src/lib.rs Outdated Show resolved Hide resolved
datafusion/core/src/execution/context.rs Show resolved Hide resolved
datafusion/core/src/physical_plan/repartition.rs Outdated Show resolved Hide resolved
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
let mut result = vec![];
// All window exprs have the same partition by, so we just use the first one:
let partition_by = self.window_expr()[0].partition_by();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be self.partition_keys instead? Maybe it doesn't matter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.partition_keys stores the keys used during repartitioning. Hence when repartitioning is available they are indeed equivalent, however when it is disabled self.partition_keys is empty. It can be seen here

datafusion/core/tests/sql/window.rs Show resolved Hide resolved
}

/// This is a "data class" we use within the [RemoveUnnecessarySorts] rule
/// that tracks the closest `SortExec` descendant for every child of a plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could use ExecutionPlan::output_ordering() rather than tracking the state separately?

That way the logic of which nodes retain what ordering is kept within the execution plan nodes themselves rather than in this pass.

Though it looks strange to me to have both ExecutionPlan::maintains_input_order as well as ExecutionPlan::output_order (I think the maintains_input_order came before) -- maybe we should remove ExecutionPlan::maintains_input_order 🤔 -- I could try this if you think it worthwhile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we use this struct is to be able to remove SortExecs down below the plan. By using output_ordering we can remove unnecessary SortExec, from the kind of plans where sorting and its usage are immediate like SortExec -> WindowAggExec. However, to remove unnecessary SortExec where sorting and its usage are linked with other order preserving executor(s) like SortExec -> ORDER_PRESERVING_EXEC -> WindowAggExec we need to keep track of linkage from current executor till to the corresponding SortExec. In the current implementation, Datafusion may be is not generating, these kind of plans. However, we didn't want to overfit to a strong assumption. As you say may be maintains_input_order is now unnecessary and this information can be derived by comparing output_ordering of consecutive executors. However, I think it is still helpful to have as a convenience method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linkage should be tracked by the output_ordering() method also. If an operator is order preserving, it should override the method output_ordering() and maintains_input_order() and make it return self.input.output_ordering()

// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is somewhat strange to me that a pass called "RemoveUnecessarySorts" is actually sometimes adding sorts 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should work quite rarely. However, there are cases, where it is necessary. Assume we have a plan like following

`Window Requires a DESC (Non reversible)`
  `Window Requires a DESC (Reversible)` 
    `Sort a DESC`
      `Window Requires a ASC`
        `Sort a ASC`

during traversing the plan bottom up , we can remove second Sort since executor its above can calculate its result with reverse order. However, one SortExec may not always satisfies single executor. Hence removing a Sort may invalidate assumptions for executors above. This check handles these cases. Our rule turns above plan to the one below

`Window Requires a DESC (Non reversible)`
  `Sort a DESC`
    `Window Requires a DESC (Reversible)` 
      `Window Requires a ASC`
        `Sort a ASC`

without this addition we will generate plan below

`Window Requires a DESC (Non reversible)`
  `Window Requires a DESC (Reversible)` 
    `Window Requires a ASC`
      `Sort a ASC`

which doesn't satisfy the requirement for the executor at the top.

Copy link
Contributor

@ozankabak ozankabak Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we can do is to name the rule as OptimizeSorts instead of RemoveUnnecessarySorts so that others do not get thrown off by the sort adding code (which runs rarely, but needs to be there to cover corner cases @mustafasrepo explains).

if children.is_empty() {
Ok(self)
} else {
let children_requirements = children
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting to see a check like the following in this module

// if there is a sort node child
if node.child.is_sort() {
  let sort = node.child;
  // if the sort's input ordering already satisfies the output
  // required ordering, it can be removed
  if ordering_satisfy_concrete(
    sort.child.output_ordering()
    node.required_input_ordering()
  ) {
    // remove sort and connect child directly to node
    node.child = sort.child
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed we are doing this check. However, we are removing SortExecs from multi layer links, such as Sort -> OrderMaintainingExec-> AnotherOrderMaintainingExec ->OrderRequiringExec corresponding snippet is
here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I would expect (hope?) that the top OrderMaintaingExec had the same output_ordering as the OrderRequiringExec

I wonder if for some types of OrderMaintaingExec nodes the "output_ordering" isn't properly computed / communicated 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if for some types of OrderMaintaingExec nodes the "output_ordering" isn't properly computed / communicated 🤔

This is right some Executors such as ProjectionExec, WindowAggExec modifies PhysicalExprs (when it is Column). Directly comparing required_input_ordering and output_ordering may not produce what is desired. We plan to file another PR to fix this. We added a TODO mentioning this observation.

}

#[tokio::test]
async fn test_add_required_sort() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think enforcement is the pass that is supposed to be adding required sorts -- as above I find it confusing that "remove unnecessary sorts" is also adding sorts 🤔

@ozankabak
Copy link
Contributor

Thank you @alamb for the thorough review. We will go through your comments and improve the PR accordingly. I will discuss both your second example and the one extra sort with @mustafasrepo tomorrow in detail. My gut feeling is that the second example you mention ought to be one of the cases this PR should handle -- we may be able to handle it by terminating the traversal "properly". I also have a theory about the extra sort thing, but I will not venture to speculate too much before I confer with @mustafasrepo.

@github-actions github-actions bot removed logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate labels Dec 23, 2022
@ozankabak
Copy link
Contributor

We went through your comments and questions, @mustafasrepo will address them in detail today.

Interestingly, the following initial thought I had about your second example

My gut feeling is that the second example you mention ought to be one of the cases this PR should handle -- we may be able to handle it by terminating the traversal "properly".

seems to be wrong and there seems to be a reason why we are not removing that sort -- @mustafasrepo will discuss this too.

Thanks again for the in-depth review.

…a-ai/arrow-datafusion into feature/sort_removal_rule

# Conflicts:
#	datafusion/physical-expr/src/aggregate/count.rs
#	datafusion/physical-expr/src/aggregate/mod.rs
#	datafusion/physical-expr/src/aggregate/sum.rs
#	datafusion/physical-expr/src/window/aggregate.rs
@mustafasrepo mustafasrepo reopened this Dec 23, 2022
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions labels Dec 23, 2022
@mustafasrepo
Copy link
Contributor Author

mustafasrepo commented Dec 23, 2022

select * from (select * from foo order by column1

Regarding your second example the plan of the query below

SELECT * FROM (SELECT c1 FROM aggregate_test_100 ORDER BY c9)

is that

    "ProjectionExec: expr=[c1@0 as c1]",
    "  SortExec: [c9@1 ASC NULLS LAST]",

which in principle doesn't require SortExec. However, some queries may produce similar physical plan like above, where SortExec is not removable. Consider another query below

SELECT c1 FROM aggregate_test_100 ORDER BY c9

its physical plan is that

    "ProjectionExec: expr=[c1@0 as c1]",
    "  SortExec: [c9@1 ASC NULLS LAST]",

In the physical plan level, I think there is no way to be sure whether SortExec below ProjectionExec is enforced or not.

@alamb
Copy link
Contributor

alamb commented Dec 23, 2022

In the physical plan level, I think there is no way to be sure whether SortExec below ProjectionExec is enforced or not.

I agree -- we would have to add some way to distinguish this case

For example

Projection {
   ...
  required_sort_order: Vec<SortExpr>
}

That could be added by the sql planer. I am not suggesting we we make this change for this PR. Maybe once we merge PR we can file it as a follow on ticket

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mustafasrepo @metesynnada

I'll plan to leave this PR open for a day or two in case anyone else wants to respond (especially @mingmwang / @yahoNanJing ) but from my perspective it looks good to merge

I may try and see if I can simplify the logic but given the current test coverage I think this one PR is fine to go in as is

@@ -332,6 +348,15 @@ mod tests {
assert_eq!(actual, expected);
Ok(())
}

#[test]
fn test_transpose() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

" RepartitionExec: partitioning=RoundRobinBatch(2)",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 1 }], 2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok -- I do think the original plan is better, but I would also be happy to merge this PR as is and improve it (potentially) as a follow on.

@ozankabak
Copy link
Contributor

Hello @alamb, we renamed the rule to the less-confusing alternative OptimizeSorts and placed the two follow-ons we discussed in our work queue. We will open separate, small PRs for them in the near future.

Feel free to merge this whenever you like. Thanks again for the review!

@alamb alamb merged commit 8ec511e into apache:master Dec 26, 2022
@alamb
Copy link
Contributor

alamb commented Dec 26, 2022

Thanks again @mustafasrepo and @ozankabak

@ursabot
Copy link

ursabot commented Dec 26, 2022

Benchmark runs are scheduled for baseline = 34475bb and contender = 8ec511e. 8ec511e is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a sort rule to remove unnecessary SortExecs from physical plan
5 participants