From ffbc6896b0f4f1b417991d1a13266be10c3f3709 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Tue, 21 Nov 2023 22:41:53 +0300 Subject: [PATCH] [MINOR]: Remove unecessary orderings from the final plan (#8289) * Remove lost orderings from the final plan * Improve comments --------- Co-authored-by: Mehmet Ozan Kabak --- .../src/physical_optimizer/enforce_sorting.rs | 4 +++- datafusion/physical-plan/src/insert.rs | 23 +++++++------------ datafusion/sqllogictest/test_files/select.slt | 23 +++++++++++++++++++ 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2590948d3b3e..6fec74f608ae 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -476,7 +476,9 @@ fn ensure_sorting( update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; } } - (None, None) => {} + (None, None) => { + update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; + } } } // For window expressions, we can remove some sorts when we can diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 4eeb58974aba..81cdfd753fe6 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -219,24 +219,17 @@ impl ExecutionPlan for FileSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // The input order is either exlicitly set (such as by a ListingTable), - // or require that the [FileSinkExec] gets the data in the order the - // input produced it (otherwise the optimizer may chose to reorder - // the input which could result in unintended / poor UX) - // - // More rationale: - // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 - match &self.sort_order { - Some(requirements) => vec![Some(requirements.clone())], - None => vec![self - .input - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)], - } + // The required input ordering is set externally (e.g. by a `ListingTable`). + // Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`). + vec![self.sort_order.as_ref().cloned()] } fn maintains_input_order(&self) -> Vec { - vec![false] + // Maintains ordering in the sense that the written file will reflect + // the ordering of the input. For more context, see: + // + // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 + vec![true] } fn children(&self) -> Vec> { diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 98ea061c731b..bb81c5a9a138 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1013,6 +1013,29 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +# When ordering lost during projection, we shouldn't keep the SortExec. +# in the final physical plan. +query TT +EXPLAIN SELECT c2, COUNT(*) +FROM (SELECT c2 +FROM aggregate_test_100 +ORDER BY c1, c2) +GROUP BY c2; +---- +logical_plan +Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--Projection: aggregate_test_100.c2 +----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST +------Projection: aggregate_test_100.c2, aggregate_test_100.c1 +--------TableScan: aggregate_test_100 projection=[c1, c2] +physical_plan +AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)] +--CoalesceBatchesExec: target_batch_size=8192 +----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2 +------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[COUNT(*)] +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true + statement ok drop table annotated_data_finite2;