From d0bd28eef34ca001d5e43d2732761a1b4cf09c71 Mon Sep 17 00:00:00 2001 From: jakevin Date: Mon, 6 Mar 2023 22:55:16 +0800 Subject: [PATCH] feat: eliminate the duplicated sort keys in Order By clause (#5462) --- .../tests/sqllogictests/test_files/order.slt | 117 +++++++++++++++++- .../src/eliminate_duplicated_expr.rs | 103 +++++++++++++++ datafusion/optimizer/src/lib.rs | 7 +- datafusion/optimizer/src/optimizer.rs | 2 + 4 files changed, 225 insertions(+), 4 deletions(-) create mode 100644 datafusion/optimizer/src/eliminate_duplicated_expr.rs diff --git a/datafusion/core/tests/sqllogictests/test_files/order.slt b/datafusion/core/tests/sqllogictests/test_files/order.slt index 6c2bb3abc7d5..d42d2cf62f1f 100644 --- a/datafusion/core/tests/sqllogictests/test_files/order.slt +++ b/datafusion/core/tests/sqllogictests/test_files/order.slt @@ -106,7 +106,6 @@ SELECT arrow_typeof(c1), arrow_typeof(c2), arrow_typeof(c3) FROM test LIMIT 1; ---- Int32 Int64 Boolean - query II SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC ---- @@ -155,6 +154,122 @@ SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC 0 9 0 10 +# eliminate duplicated sorted expr +query TT +explain SELECT c1, c2 FROM aggregate_test_100 ORDER BY c2, c3, c2 +---- +logical_plan +Projection: aggregate_test_100.c1, aggregate_test_100.c2 + Sort: aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST + TableScan: aggregate_test_100 projection=[c1, c2, c3] +physical_plan +ProjectionExec: expr=[c1@0 as c1, c2@1 as c2] + SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST] + CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3] + +query II +SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2 +---- +1 -99 +1 -98 +1 -85 +1 -72 +1 -56 +1 -25 +1 -24 +1 -8 +1 -5 +1 12 +1 29 +1 36 +1 38 +1 41 +1 54 +1 57 +1 70 +1 71 +1 83 +1 103 +1 120 +1 125 +2 -117 +2 -107 +2 -106 +2 -61 +2 -60 +2 -60 +2 -48 +2 -43 +2 -29 +2 1 +2 29 +2 31 +2 45 +2 49 +2 52 +2 52 +2 63 +2 68 +2 93 +2 97 +2 113 +2 122 +3 -101 +3 -95 +3 -76 +3 -72 +3 -12 +3 -2 +3 13 +3 13 +3 14 +3 17 +3 17 +3 22 +3 71 +3 73 +3 77 +3 97 +3 104 +3 112 +3 123 +4 -117 +4 -111 +4 -101 +4 -90 +4 -79 +4 -59 +4 -56 +4 -54 +4 -53 +4 -38 +4 3 +4 5 +4 17 +4 30 +4 47 +4 55 +4 65 +4 73 +4 74 +4 96 +4 97 +4 102 +4 123 +5 -101 +5 -94 +5 -86 +5 -82 +5 -59 +5 -44 +5 -40 +5 -31 +5 -5 +5 36 +5 62 +5 64 +5 68 +5 118 # sort_empty diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs new file mode 100644 index 000000000000..5a882108e0fd --- /dev/null +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::Sort; +use hashbrown::HashSet; + +/// Optimization rule that eliminate duplicated expr. +#[derive(Default)] +pub struct EliminateDuplicatedExpr; + +impl EliminateDuplicatedExpr { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for EliminateDuplicatedExpr { + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + match plan { + LogicalPlan::Sort(sort) => { + // dedup sort.expr and keep order + let mut dedup_expr = Vec::new(); + let mut dedup_set = HashSet::new(); + for expr in &sort.expr { + if !dedup_set.contains(expr) { + dedup_expr.push(expr); + dedup_set.insert(expr.clone()); + } + } + if dedup_expr.len() == sort.expr.len() { + Ok(None) + } else { + Ok(Some(LogicalPlan::Sort(Sort { + expr: dedup_expr.into_iter().cloned().collect::>(), + input: sort.input.clone(), + fetch: sort.fetch, + }))) + } + } + _ => Ok(None), + } + } + + fn name(&self) -> &str { + "eliminate_duplicated_expr" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder}; + use std::sync::Arc; + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + crate::test::assert_optimized_plan_eq( + Arc::new(EliminateDuplicatedExpr::new()), + plan, + expected, + ) + } + + #[test] + fn eliminate_sort_expr() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a"), col("a"), col("b"), col("c")])? + .limit(5, Some(10))? + .build()?; + let expected = "Limit: skip=5, fetch=10\ + \n Sort: test.a, test.b, test.c\ + \n TableScan: test"; + assert_optimized_plan_eq(&plan, expected) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index ca0611d4e17e..4bbbb4645af3 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -20,6 +20,7 @@ pub mod common_subexpr_eliminate; pub mod decorrelate_where_exists; pub mod decorrelate_where_in; pub mod eliminate_cross_join; +pub mod eliminate_duplicated_expr; pub mod eliminate_filter; pub mod eliminate_limit; pub mod eliminate_outer_join; @@ -33,17 +34,17 @@ pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; pub mod push_down_projection; +pub mod replace_distinct_aggregate; +pub mod rewrite_disjunctive_predicate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; pub mod single_distinct_to_groupby; pub mod type_coercion; +pub mod unwrap_cast_in_comparison; pub mod utils; -pub mod replace_distinct_aggregate; -pub mod rewrite_disjunctive_predicate; #[cfg(test)] pub mod test; -pub mod unwrap_cast_in_comparison; pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 5077bed9090d..c1baa25d4363 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -21,6 +21,7 @@ use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_where_exists::DecorrelateWhereExists; use crate::decorrelate_where_in::DecorrelateWhereIn; use crate::eliminate_cross_join::EliminateCrossJoin; +use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; use crate::eliminate_limit::EliminateLimit; use crate::eliminate_outer_join::EliminateOuterJoin; @@ -222,6 +223,7 @@ impl Optimizer { Arc::new(SimplifyExpressions::new()), Arc::new(MergeProjection::new()), Arc::new(RewriteDisjunctivePredicate::new()), + Arc::new(EliminateDuplicatedExpr::new()), Arc::new(EliminateFilter::new()), Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()),