Skip to content

Commit

Permalink
[MINOR]: Remove unecessary orderings from the final plan (#8289)
Browse files Browse the repository at this point in the history
* Remove lost orderings from the final plan

* Improve comments

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Nov 21, 2023
1 parent e9b9645 commit ffbc689
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 16 deletions.
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,9 @@ fn ensure_sorting(
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
}
}
(None, None) => {}
(None, None) => {
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
}
}
}
// For window expressions, we can remove some sorts when we can
Expand Down
23 changes: 8 additions & 15 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,17 @@ impl ExecutionPlan for FileSinkExec {
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// The input order is either exlicitly set (such as by a ListingTable),
// or require that the [FileSinkExec] gets the data in the order the
// input produced it (otherwise the optimizer may chose to reorder
// the input which could result in unintended / poor UX)
//
// More rationale:
// https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
match &self.sort_order {
Some(requirements) => vec![Some(requirements.clone())],
None => vec![self
.input
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)],
}
// The required input ordering is set externally (e.g. by a `ListingTable`).
// Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`).
vec![self.sort_order.as_ref().cloned()]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![false]
// Maintains ordering in the sense that the written file will reflect
// the ordering of the input. For more context, see:
//
// https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
vec![true]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
23 changes: 23 additions & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,29 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true

# When ordering lost during projection, we shouldn't keep the SortExec.
# in the final physical plan.
query TT
EXPLAIN SELECT c2, COUNT(*)
FROM (SELECT c2
FROM aggregate_test_100
ORDER BY c1, c2)
GROUP BY c2;
----
logical_plan
Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
--Projection: aggregate_test_100.c2
----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST
------Projection: aggregate_test_100.c2, aggregate_test_100.c1
--------TableScan: aggregate_test_100 projection=[c1, c2]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2
------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[COUNT(*)]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true

statement ok
drop table annotated_data_finite2;

Expand Down

0 comments on commit ffbc689

Please sign in to comment.