-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33183][SQL] Fix Optimizer rule EliminateSorts and add a physical rule to remove redundant sorts #30093
Changes from all commits
a3519a6
52a191b
118ba4c
5997320
cec266f
290cb4d
cdc7dbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1253,6 +1253,13 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need this config? cc: @cloud-fan There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to backport this, it's safer to have a config for the new rule, in case it has bugs. |
||
.internal() | ||
.doc("Whether to remove redundant physical sort node") | ||
.version("3.1.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we backport this into branch-2.4, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not exactly sure if it's better to change it in this PR or to change it when this PR is backported to 2.4.8 (in case the current change does not work in 2.4.8) |
||
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val STATE_STORE_PROVIDER_CLASS = | ||
buildConf("spark.sql.streaming.stateStore.providerClass") | ||
.internal() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,12 +99,34 @@ class EliminateSortsSuite extends AnalysisTest { | |
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("remove redundant order by") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to modify the existing test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test was actually changed from |
||
test("SPARK-33183: remove consecutive no-op sorts") { | ||
val plan = testRelation.orderBy().orderBy().orderBy() | ||
val optimized = Optimize.execute(plan.analyze) | ||
val correctAnswer = testRelation.analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: remove redundant sort by") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) | ||
val unnecessaryReordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst) | ||
val optimized = Optimize.execute(unnecessaryReordered.analyze) | ||
val correctAnswer = orderedPlan.limit(2).select('a).analyze | ||
comparePlans(Optimize.execute(optimized), correctAnswer) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: remove all redundant local sorts") { | ||
val orderedPlan = testRelation.sortBy('a.asc).orderBy('a.asc).sortBy('a.asc) | ||
val optimized = Optimize.execute(orderedPlan.analyze) | ||
val correctAnswer = testRelation.orderBy('a.asc).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: should not remove global sort") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst) | ||
val reordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst) | ||
val optimized = Optimize.execute(reordered.analyze) | ||
val correctAnswer = reordered.analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("do not remove sort if the order is different") { | ||
|
@@ -115,22 +137,39 @@ class EliminateSortsSuite extends AnalysisTest { | |
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("filters don't affect order") { | ||
test("SPARK-33183: remove top level local sort with filter operators") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc) | ||
val optimized = Optimize.execute(filteredAndReordered.analyze) | ||
val correctAnswer = orderedPlan.where('a > Literal(10)).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("limits don't affect order") { | ||
test("SPARK-33183: keep top level global sort with filter operators") { | ||
val projectPlan = testRelation.select('a, 'b) | ||
val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc) | ||
val optimized = Optimize.execute(filteredAndReordered.analyze) | ||
val correctAnswer = projectPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: limits should not affect order for local sort") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc) | ||
val optimized = Optimize.execute(filteredAndReordered.analyze) | ||
val correctAnswer = orderedPlan.limit(Literal(10)).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: should not remove global sort with limit operators") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc) | ||
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc) | ||
val optimized = Optimize.execute(filteredAndReordered.analyze) | ||
val correctAnswer = filteredAndReordered.analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("different sorts are not simplified if limit is in between") { | ||
val orderedPlan = testRelation.select('a, 'b).orderBy('b.desc).limit(Literal(10)) | ||
.orderBy('a.asc) | ||
|
@@ -139,11 +178,11 @@ class EliminateSortsSuite extends AnalysisTest { | |
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("range is already sorted") { | ||
test("SPARK-33183: should not remove global sort with range operator") { | ||
val inputPlan = Range(1L, 1000L, 1, 10) | ||
val orderedPlan = inputPlan.orderBy('id.asc) | ||
val optimized = Optimize.execute(orderedPlan.analyze) | ||
val correctAnswer = inputPlan.analyze | ||
val correctAnswer = orderedPlan.analyze | ||
comparePlans(optimized, correctAnswer) | ||
|
||
val reversedPlan = inputPlan.orderBy('id.desc) | ||
|
@@ -154,10 +193,18 @@ class EliminateSortsSuite extends AnalysisTest { | |
val negativeStepInputPlan = Range(10L, 1L, -1, 10) | ||
val negativeStepOrderedPlan = negativeStepInputPlan.orderBy('id.desc) | ||
val negativeStepOptimized = Optimize.execute(negativeStepOrderedPlan.analyze) | ||
val negativeStepCorrectAnswer = negativeStepInputPlan.analyze | ||
val negativeStepCorrectAnswer = negativeStepOrderedPlan.analyze | ||
comparePlans(negativeStepOptimized, negativeStepCorrectAnswer) | ||
} | ||
|
||
test("SPARK-33183: remove local sort with range operator") { | ||
val inputPlan = Range(1L, 1000L, 1, 10) | ||
val orderedPlan = inputPlan.sortBy('id.asc) | ||
val optimized = Optimize.execute(orderedPlan.analyze) | ||
val correctAnswer = inputPlan.analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("sort should not be removed when there is a node which doesn't guarantee any order") { | ||
val orderedPlan = testRelation.select('a, 'b) | ||
val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) | ||
|
@@ -333,4 +380,39 @@ class EliminateSortsSuite extends AnalysisTest { | |
val correctAnswer = PushDownOptimizer.execute(noOrderByPlan.analyze) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: remove consecutive global sorts with the same ordering") { | ||
Seq( | ||
(testRelation.orderBy('a.asc).orderBy('a.asc), testRelation.orderBy('a.asc)), | ||
(testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc), testRelation.orderBy('a.asc)) | ||
).foreach { case (ordered, answer) => | ||
val optimized = Optimize.execute(ordered.analyze) | ||
comparePlans(optimized, answer.analyze) | ||
} | ||
} | ||
|
||
test("SPARK-33183: remove consecutive local sorts with the same ordering") { | ||
val orderedPlan = testRelation.sortBy('a.asc).sortBy('a.asc).sortBy('a.asc) | ||
val optimized = Optimize.execute(orderedPlan.analyze) | ||
val correctAnswer = testRelation.sortBy('a.asc).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: remove consecutive local sorts with different ordering") { | ||
val orderedPlan = testRelation.sortBy('b.asc).sortBy('a.desc).sortBy('a.asc) | ||
val optimized = Optimize.execute(orderedPlan.analyze) | ||
val correctAnswer = testRelation.sortBy('a.asc).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("SPARK-33183: should keep global sort when child is a local sort with the same ordering") { | ||
val correctAnswer = testRelation.orderBy('a.asc).analyze | ||
Seq( | ||
testRelation.sortBy('a.asc).orderBy('a.asc), | ||
testRelation.orderBy('a.asc).sortBy('a.asc).orderBy('a.asc) | ||
).foreach { ordered => | ||
val optimized = Optimize.execute(ordered.analyze) | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.sql.catalyst.expressions.SortOrder | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
/** | ||
* Remove redundant SortExec node from the spark plan. A sort node is redundant when | ||
* its child satisfies both its sort orders and its required child distribution. Note | ||
* this rule differs from the Optimizer rule EliminateSorts in that this rule also checks | ||
* if the child satisfies the required distribution so that it is safe to remove not only a | ||
* local sort but also a global sort when its child already satisfies required sort orders. | ||
*/ | ||
object RemoveRedundantSorts extends Rule[SparkPlan] { | ||
def apply(plan: SparkPlan): SparkPlan = { | ||
if (!conf.getConf(SQLConf.REMOVE_REDUNDANT_SORTS_ENABLED)) { | ||
plan | ||
} else { | ||
removeSorts(plan) | ||
} | ||
} | ||
|
||
private def removeSorts(plan: SparkPlan): SparkPlan = plan transform { | ||
case s @ SortExec(orders, _, child, _) | ||
if SortOrder.orderingSatisfies(child.outputOrdering, orders) && | ||
child.outputPartitioning.satisfies(s.requiredChildDistribution.head) => | ||
child | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added in 2.4.
cc @cloud-fan