Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer #20560

Closed
wants to merge 12 commits into from

Conversation

mgaido91
Copy link
Contributor

@mgaido91 mgaido91 commented Feb 9, 2018

What changes were proposed in this pull request?

Added a new rule to remove Sort operation when its child is already sorted.
For instance, this simple code:

spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1")
val df = sql(s"""SELECT b
                | FROM (
                |     SELECT a, b
                |     FROM table1
                |     ORDER BY a
                | ) t
                | ORDER BY a""".stripMargin)
df.explain(true)

before the PR produces this plan:

== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- Sort [a#6 ASC NULLS FIRST], true
         +- Project [_1#3 AS a#6, _2#4 AS b#7]
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
               +- ExternalRDD [obj#2]

== Physical Plan ==
*(3) Project [b#7]
+- *(3) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
      +- *(2) Project [b#7, a#6]
         +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
            +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
               +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
                     +- Scan ExternalRDDScan[obj#2]

while after the PR produces:

== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
      +- 'Sort ['a ASC NULLS FIRST], true
         +- 'Project ['a, 'b]
            +- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
      +- SubqueryAlias t
         +- Sort [a#6 ASC NULLS FIRST], true
            +- Project [a#6, b#7]
               +- SubqueryAlias table1
                  +- Project [_1#3 AS a#6, _2#4 AS b#7]
                     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
                        +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [_1#3 AS a#6, _2#4 AS b#7]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
         +- ExternalRDD [obj#2]

== Physical Plan ==
*(2) Project [b#7]
+- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5)
      +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
            +- Scan ExternalRDDScan[obj#2]

this means that an unnecessary sort operation is not performed after the PR.

How was this patch tested?

added UT

@SparkQA
Copy link

SparkQA commented Feb 9, 2018

Test build #87261 has finished for PR 20560 at commit 550ff99.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class KeepOrderUnaryNode extends UnaryNode
  • case class Subquery(child: LogicalPlan) extends KeepOrderUnaryNode
  • case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
  • case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode

@gatorsmile
Copy link
Member

Thanks! This should be added as a separate rule. It is actually to resolve the comment in #11480 (comment)

I did not review it carefully, but it requires more test cases, including unit tests and end-to-end tests

@mgaido91
Copy link
Contributor Author

@gatorsmile thanks for your comment. I moved it to a separate rule and added more tests.

As per the added value of this rule, I see 3 main points:

  1. Let's imagine that a user exposes a cached sorted relation which can be queried by other users via JDBC. Other users cannot know that the table is already sorted and they may write query which cause an unnecessary sort.
  2. Many tools which produce automatic SQL code are not very smart in creating it, so they can generate queries which cause unneeded sorts.
  3. I think this is also enabling for more interesting use cases. What I am thinking about is that we may have some datasources which store sorted data and if we can express this in the logical plan, then we may avoid unneeded sorts.

What do you think?
Thanks.

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87288 has finished for PR 20560 at commit 81e4828.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends KeepOrderUnaryNode

@gatorsmile
Copy link
Member

@mgaido91 Yeah, we definitely should include this rule. We just need more careful review and comprehensive test cases. Thanks for your work!

@mgaido91
Copy link
Contributor Author

thank you @gatorsmile for taking a look at this. Let me know if there is something I can/should improve. Thanks.

@mgaido91
Copy link
Contributor Author

mgaido91 commented Mar 1, 2018

@gatorsmile sorry, do you have time now to take a look at this? Or may I ping you some days later if you are busy? Thanks.

@gatorsmile
Copy link
Member

Will review this in the next few days.

@mgaido91
Copy link
Contributor Author

kindly ping @gatorsmile

/**
* If the current plan contains sorted data, it contains the sorted order.
*/
def sortedOrder: Seq[SortOrder] = Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def outputOrdering?

@@ -219,6 +219,11 @@ abstract class LogicalPlan
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
*/
def refresh(): Unit = children.foreach(_.refresh())

/**
* If the current plan contains sorted data, it contains the sorted order.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the output ordering that this plan generates.

object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Sort(orders, true, child) if child.sortedOrder.nonEmpty
&& child.sortedOrder.zip(orders).forall { case (s1, s2) => s1.satisfies(s2) } =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using SortOrder.orderingSatisfies?

@gatorsmile
Copy link
Member

cc @cloud-fan @hvanhovell @wzhfy

@mgaido91
Copy link
Contributor Author

mgaido91 commented Apr 3, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Apr 3, 2018

Test build #88849 has finished for PR 20560 at commit 1c33263.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Sort(orders, true, child) if child.outputOrdering.nonEmpty
&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
Copy link
Contributor

@cloud-fan cloud-fan Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it after planning as we already have SparkPlan.outputOrdering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah they are different. This is global ordering,

override def output: Seq[Attribute] = child.output
}

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like ProjectExec.outputOrdering, we can propagate ordering for aliased attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I don't fully understand what you mean. In ProjectExec.outputOrdering we are getting the child.outputOrdering exactly as it is done here.

@@ -867,6 +871,11 @@ case class RepartitionByExpression(

override def maxRows: Option[Long] = child.maxRows
override def shuffle: Boolean = true

override def outputOrdering: Seq[SortOrder] = partitioning match {
case RangePartitioning(ordering, _) => ordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RangePartitioning doesn't guarantee ordering inside partition, we can't do this.

@SparkQA
Copy link

SparkQA commented Apr 4, 2018

Test build #88888 has finished for PR 20560 at commit 60ea6fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

class RemoveRedundantSortsSuite extends PlanTest {
override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, ORDER_BY_ORDINAL -> false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
val analyzer = new Analyzer(catalog, conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use ordinal number, we can remove these.

test("remove redundant order by") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(analyzer.execute(unnecessaryReordered))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use unnecessaryReordered.analyze?

@@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] {
}
}

/**
* Removes Sort operations on already sorted data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Removes Sort operation if the child is already sorted?

@@ -522,6 +524,8 @@ case class Range(
override def computeStats(): Statistics = {
Statistics(sizeInBytes = LongType.defaultSize * numElements)
}

override def outputOrdering: Seq[SortOrder] = output.map(a => SortOrder(a, Descending))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ordering is the same when step in Range is positive or negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, thanks! I missed it!

val resorted = query.sort('key.desc)
assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty)
assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
(1 to 100).sorted(Ordering[Int].reverse))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1 to 100).reverse?

(1 to 100).sorted(Ordering[Int].reverse))
// with a different order, the sort is needed
val sortedAsc = query.sort('key)
assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.nonEmpty -> .size == 1

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89118 has finished for PR 20560 at commit 1c7cae6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89135 has finished for PR 20560 at commit e376c19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@henryr henryr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple suggestions, feel free to ignore.

@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {

override final def children: Seq[LogicalPlan] = Seq(left, right)
}

abstract class KeepOrderUnaryNode extends UnaryNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderPreservingUnaryNode? Or perhaps do you think this would be better modeled as a mixin trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion. I'd love to hear also @cloud-fan's and @wzhfy's opinion on this in order to choose all together the best name for it. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderPreservingUnaryNode sounds better.

It only makes sense for unary node, so I don't think mixin trait is a good idea.

import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange,
ShuffleExchangeExec}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a unnecessary change. We don't have length limit for imports

case Sort(orders, true, child) if child.outputOrdering.nonEmpty
&& SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not want to do it in this PR, but you could easily remove another simple kind of redundant sort, e.g.:

rel.orderBy('a.desc).orderBy('a.asc)

(and I think that orderBy is not stable, so any two consecutive orderBy operators are redundant).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Probably we can do this in other PR. May you open a JIRA for this? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good follow-up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed SPARK-23973 for this

@@ -169,4 +169,6 @@ case class InMemoryRelation(

override protected def otherCopyArgs: Seq[AnyRef] =
Seq(_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in SparkPlan

/** Specifies how data is ordered in each partition. */
def outputOrdering: Seq[SortOrder] = Nil

So we can't do this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should carry the logical ordering from the cached logical plan when building the InMemoryRelation

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89195 has finished for PR 20560 at commit a1846ab.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89197 has finished for PR 20560 at commit 4e441f8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89249 has finished for PR 20560 at commit 6e95e37.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89257 has finished for PR 20560 at commit 6e95e37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Sort(orders, true, child) if child.outputOrdering.nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.outputOrdering.nonEmpty looks like unnecessary

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89330 has finished for PR 20560 at commit 6c5f04c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 25892f3 Apr 13, 2018
ghost pushed a commit to dbtsai/spark that referenced this pull request Apr 24, 2018
## What changes were proposed in this pull request?

In SPARK-23375 we introduced the ability of removing `Sort` operation during query optimization if the data is already sorted. In this follow-up we remove also a `Sort` which is followed by another `Sort`: in this case the first sort is not needed and can be safely removed.

The PR starts from henryr's comment: apache#20560 (comment). So credit should be given to him.

## How was this patch tested?

added UT

Author: Marco Gaido <[email protected]>

Closes apache#21072 from mgaido91/SPARK-23973.
@rxin
Copy link
Contributor

rxin commented Apr 25, 2018

Just saw this - this seems like a somewhat awkward way to do it by just matching on filter / project. Is the main thing lacking a way to do back propagation for properties? (We can only do forward propagation at the moment on properties so we can't eliminate subtree's sort based on the parent's sort).

@cloud-fan
Copy link
Contributor

@rxin It seems you are talking about the followup PR: #21072

I think this is the way we do back propagation in catalyst: match a specific node, traverse down the subtree with the properties.

For forward propagation, we also need to carefully handle some nodes that would stop the propagation. In RemoveRedundantSorts.canEliminateSort, we are doing the same thing: only list the nodes that can retain the properties. e.g. Limit should stop propagating the sorting property. I think Project, Filter, Hint is good enough as an initial list, we can expand it later.

@@ -64,7 +64,8 @@ case class InMemoryRelation(
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
statsOfPlanToCache: Statistics)
statsOfPlanToCache: Statistics,
override val outputOrdering: Seq[SortOrder])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be added to otherCopyArgs ; otherwise, we will lose it when doing the tree transformation. #22715 fixed it.

srowen pushed a commit that referenced this pull request Dec 31, 2018
…ssing

## What changes were proposed in this pull request?
#20560/[SPARK-23375](https://issues.apache.org/jira/browse/SPARK-23375) introduced an optimizer rule to eliminate redundant Sort. For a test case named "Sort metrics" in `SQLMetricsSuite`, because range is already sorted, sort is removed by the `RemoveRedundantSorts`, which makes this test case meaningless.

This PR modifies the query for testing Sort metrics and checks Sort exists in the plan.

## How was this patch tested?
Modify the existing test case.

Closes #23258 from seancxmao/sort-metrics.

Authored-by: seancxmao <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ssing

## What changes were proposed in this pull request?
apache#20560/[SPARK-23375](https://issues.apache.org/jira/browse/SPARK-23375) introduced an optimizer rule to eliminate redundant Sort. For a test case named "Sort metrics" in `SQLMetricsSuite`, because range is already sorted, sort is removed by the `RemoveRedundantSorts`, which makes this test case meaningless.

This PR modifies the query for testing Sort metrics and checks Sort exists in the plan.

## How was this patch tested?
Modify the existing test case.

Closes apache#23258 from seancxmao/sort-metrics.

Authored-by: seancxmao <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants