Skip to content

Commit

Permalink
feat: eliminate the duplicated sort keys in Order By clause (#5462)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Mar 6, 2023
1 parent 21e33a3 commit d0bd28e
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 4 deletions.
117 changes: 116 additions & 1 deletion datafusion/core/tests/sqllogictests/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ SELECT arrow_typeof(c1), arrow_typeof(c2), arrow_typeof(c3) FROM test LIMIT 1;
----
Int32 Int64 Boolean


query II
SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC
----
Expand Down Expand Up @@ -155,6 +154,122 @@ SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC
0 9
0 10

# eliminate duplicated sorted expr
query TT
explain SELECT c1, c2 FROM aggregate_test_100 ORDER BY c2, c3, c2
----
logical_plan
Projection: aggregate_test_100.c1, aggregate_test_100.c2
Sort: aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST
TableScan: aggregate_test_100 projection=[c1, c2, c3]
physical_plan
ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]
SortExec: expr=[c2@1 ASC NULLS LAST,c3@2 ASC NULLS LAST]
CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3]

query II
SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
----
1 -99
1 -98
1 -85
1 -72
1 -56
1 -25
1 -24
1 -8
1 -5
1 12
1 29
1 36
1 38
1 41
1 54
1 57
1 70
1 71
1 83
1 103
1 120
1 125
2 -117
2 -107
2 -106
2 -61
2 -60
2 -60
2 -48
2 -43
2 -29
2 1
2 29
2 31
2 45
2 49
2 52
2 52
2 63
2 68
2 93
2 97
2 113
2 122
3 -101
3 -95
3 -76
3 -72
3 -12
3 -2
3 13
3 13
3 14
3 17
3 17
3 22
3 71
3 73
3 77
3 97
3 104
3 112
3 123
4 -117
4 -111
4 -101
4 -90
4 -79
4 -59
4 -56
4 -54
4 -53
4 -38
4 3
4 5
4 17
4 30
4 47
4 55
4 65
4 73
4 74
4 96
4 97
4 102
4 123
5 -101
5 -94
5 -86
5 -82
5 -59
5 -44
5 -40
5 -31
5 -5
5 36
5 62
5 64
5 68
5 118


# sort_empty
Expand Down
103 changes: 103 additions & 0 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::Sort;
use hashbrown::HashSet;

/// Optimization rule that eliminate duplicated expr.
#[derive(Default)]
pub struct EliminateDuplicatedExpr;

impl EliminateDuplicatedExpr {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl OptimizerRule for EliminateDuplicatedExpr {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
// dedup sort.expr and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
for expr in &sort.expr {
if !dedup_set.contains(expr) {
dedup_expr.push(expr);
dedup_set.insert(expr.clone());
}
}
if dedup_expr.len() == sort.expr.len() {
Ok(None)
} else {
Ok(Some(LogicalPlan::Sort(Sort {
expr: dedup_expr.into_iter().cloned().collect::<Vec<_>>(),
input: sort.input.clone(),
fetch: sort.fetch,
})))
}
}
_ => Ok(None),
}
}

fn name(&self) -> &str {
"eliminate_duplicated_expr"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::*;
use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder};
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
crate::test::assert_optimized_plan_eq(
Arc::new(EliminateDuplicatedExpr::new()),
plan,
expected,
)
}

#[test]
fn eliminate_sort_expr() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("a"), col("a"), col("b"), col("c")])?
.limit(5, Some(10))?
.build()?;
let expected = "Limit: skip=5, fetch=10\
\n Sort: test.a, test.b, test.c\
\n TableScan: test";
assert_optimized_plan_eq(&plan, expected)
}
}
7 changes: 4 additions & 3 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod common_subexpr_eliminate;
pub mod decorrelate_where_exists;
pub mod decorrelate_where_in;
pub mod eliminate_cross_join;
pub mod eliminate_duplicated_expr;
pub mod eliminate_filter;
pub mod eliminate_limit;
pub mod eliminate_outer_join;
Expand All @@ -33,17 +34,17 @@ pub mod propagate_empty_relation;
pub mod push_down_filter;
pub mod push_down_limit;
pub mod push_down_projection;
pub mod replace_distinct_aggregate;
pub mod rewrite_disjunctive_predicate;
pub mod scalar_subquery_to_join;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
pub mod type_coercion;
pub mod unwrap_cast_in_comparison;
pub mod utils;

pub mod replace_distinct_aggregate;
pub mod rewrite_disjunctive_predicate;
#[cfg(test)]
pub mod test;
pub mod unwrap_cast_in_comparison;

pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
pub use utils::optimize_children;
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::decorrelate_where_exists::DecorrelateWhereExists;
use crate::decorrelate_where_in::DecorrelateWhereIn;
use crate::eliminate_cross_join::EliminateCrossJoin;
use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::eliminate_outer_join::EliminateOuterJoin;
Expand Down Expand Up @@ -222,6 +223,7 @@ impl Optimizer {
Arc::new(SimplifyExpressions::new()),
Arc::new(MergeProjection::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Arc::new(EliminateFilter::new()),
Arc::new(EliminateCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Expand Down

0 comments on commit d0bd28e

Please sign in to comment.