Skip to content

Commit

Permalink
[SPARK-23957][SQL] Remove redundant sort operators from subqueries
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering
them is therefore redundant (unless combined with a limit). This patch
adds a new optimizer rule that removes sort operators that are directly
below subqueries (or some combination of projection and filtering below
a subquery).

## How was this patch tested?

New unit tests. All sql unit tests pass.
  • Loading branch information
henryr committed Apr 12, 2018
1 parent e904dfa commit bb992c2
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// However, because we also use the analyzer to canonicalize queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
// Must come before EliminateSubqueryAliases.
RemoveSubquerySorts,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
Expand Down Expand Up @@ -307,6 +309,32 @@ object RemoveRedundantProject extends Rule[LogicalPlan] {
}
}

/**
* Remove [[Sort]] in subqueries that do not affect the set of rows produced, only their
* order. Subqueries produce unordered sets of rows so sorting their output is unnecessary.
*/
object RemoveSubquerySorts extends Rule[LogicalPlan] {

/**
* Removes all [[Sort]] operators from a plan that are accessible from the root operator via
* 0 or more [[Project]], [[Filter]] or [[View]] operators.
*/
private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
plan match {
case Sort(_, _, child) => removeTopLevelSorts(child)
case Project(fields, child) => Project(fields, removeTopLevelSorts(child))
case Filter(condition, child) => Filter(condition, removeTopLevelSorts(child))
case View(tbl, output, child) => View(tbl, output, removeTopLevelSorts(child))
case _ => plan
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(child) => Subquery(removeTopLevelSorts(child))
case SubqueryAlias(name, child) => SubqueryAlias(name, removeTopLevelSorts(child))
}
}

/**
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, SimpleAnalyzer}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class RemoveSubquerySortsSuite extends PlanTest {

private object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
RemoveSubquerySorts,
EliminateSubqueryAliases) :: Nil
}

private val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

private def analyzeAndCompare(plan: LogicalPlan, correct: LogicalPlan) {
// We can't use the implicit analyze method, that tests usually use, for 'plan'
// because it explicitly calls EliminateSubqueryAliases.
comparePlans(Optimize.execute(SimpleAnalyzer.execute(plan)), correct.analyze)
}

test("Remove top-level sort") {
val query = testRelation.orderBy('a.asc).subquery('x)
analyzeAndCompare(query, testRelation)
}

test("Remove sort behind filter and project") {
val query = testRelation.orderBy('a.asc).where('a.attr > 10).select('b).subquery('x)
analyzeAndCompare(query, testRelation.where('a.attr > 10).select('b))
}

test("Remove sort below subquery that is not at root") {
val query = testRelation.orderBy('a.asc).subquery('x).groupBy('a)(sum('b))
analyzeAndCompare(query, testRelation.groupBy('a)(sum('b)))
}

test("Sorts with limits must not be removed from subqueries") {
val query = testRelation.orderBy('a.asc).limit(10).subquery('x)
analyzeAndCompare(query, testRelation.orderBy('a.asc).limit(10))
}

test("Remove more than one sort") {
val query = testRelation.orderBy('a.asc).orderBy('b.desc).subquery('x)
analyzeAndCompare(query, testRelation)
}

test("Nested subqueries") {
val query = testRelation.orderBy('a.asc).subquery('x).orderBy('b.desc).subquery('y)
analyzeAndCompare(query, testRelation)
}

test("Sorts below non-project / filter operators don't get removed") {
val query = testRelation.orderBy('a.asc).groupBy('a)(sum('b)).subquery('x)
analyzeAndCompare(query, testRelation.orderBy('a.asc).groupBy('a)(sum('b)))
}
}

0 comments on commit bb992c2

Please sign in to comment.