-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33183][SQL] Fix Optimizer rule EliminateSorts and add a physical rule to remove redundant sorts #30093
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #130011 has finished for PR 30093 at commit
|
@@ -1056,8 +1058,14 @@ object EliminateSorts extends Rule[LogicalPlan] { | |||
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) => | |||
val newOrders = orders.filterNot(_.child.foldable) | |||
if (newOrders.isEmpty) child else s.copy(order = newOrders) | |||
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added in 2.4.
cc @cloud-fan
if SortOrder.orderingSatisfies(child.outputOrdering, orders) => | ||
(global, child) match { | ||
case (false, _) => child | ||
case (true, r: Range) => r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes we know Range's global ordering in advance. This seems to leak physical stuff into the Optimizer.
case (false, _) => child | ||
case (true, r: Range) => r | ||
case (true, s @ Sort(_, true, _)) => s | ||
case (true, _) => s.copy(child = recursiveRemoveSort(child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you modify the code to handle this case in L1069?
val optimized = Optimize.execute(unnecessaryReordered.analyze) | ||
val correctAnswer = orderedPlan.limit(2).select('a).analyze | ||
comparePlans(Optimize.execute(optimized), correctAnswer) | ||
} | ||
|
||
test("should not remove global sort") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add a prefix: SPARK-33183:
for all the added tests?
Shouldn't it be like this? diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c37dac90c5..79610b4ab2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1058,15 +1058,9 @@ object EliminateSorts extends Rule[LogicalPlan] {
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
val newOrders = orders.filterNot(_.child.foldable)
if (newOrders.isEmpty) child else s.copy(order = newOrders)
- case s @ Sort(orders, global, child)
- if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
- (global, child) match {
- case (false, _) => child
- case (true, r: Range) => r
- case (true, s @ Sort(_, true, _)) => s
- case (true, _) => s.copy(child = recursiveRemoveSort(child))
- }
- case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+ case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+ child
+ case s @ Sort(_, true, child) => s.copy(child = recursiveRemoveSort(child))
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) => Two simpler rules:
This fails two tests:
|
Taking it a bit further we can also remove local child sorts inside a local sort: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c37dac90c5..dee3e40f0d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1058,33 +1058,29 @@ object EliminateSorts extends Rule[LogicalPlan] {
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
val newOrders = orders.filterNot(_.child.foldable)
if (newOrders.isEmpty) child else s.copy(order = newOrders)
- case s @ Sort(orders, global, child)
- if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
- (global, child) match {
- case (false, _) => child
- case (true, r: Range) => r
- case (true, s @ Sort(_, true, _)) => s
- case (true, _) => s.copy(child = recursiveRemoveSort(child))
- }
- case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+ case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
+ child
+ case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
g.copy(child = recursiveRemoveSort(originChild))
}
- private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
- case Sort(_, _, child) => recursiveRemoveSort(child)
- case other if canEliminateSort(other) =>
- other.withNewChildren(other.children.map(recursiveRemoveSort))
+ private def recursiveRemoveSort(
+ plan: LogicalPlan, global: Boolean = true): LogicalPlan = plan match {
+ case Sort(_, false, child) if !global => recursiveRemoveSort(child, global)
+ case Sort(_, _, child) if global => recursiveRemoveSort(child, global)
+ case other if canEliminateSort(other, global) =>
+ other.withNewChildren(other.children.map(recursiveRemoveSort(_, global)))
case _ => plan
}
- private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
+ private def canEliminateSort(plan: LogicalPlan, global: Boolean): Boolean = plan match {
case p: Project => p.projectList.forall(_.deterministic)
case f: Filter => f.condition.deterministic
- case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
- case _: Repartition => true
+ case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic) && global
+ case _: Repartition => global
case _ => false
} |
I like the approach of using two simpler rules and removing the special case for the @tanelk I am not sure if I fully understand your proposed logic behind "remove local child sorts inside a local sort". Say we have this case
With the proposed logic, the initial Can you provide an example when your logic can eliminate local child sorts inside a local sort while the current logic cannot? |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status success |
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
Show resolved
Hide resolved
@@ -1052,11 +1052,11 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { | |||
* function is order irrelevant | |||
*/ | |||
object EliminateSorts extends Rule[LogicalPlan] { | |||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | |||
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember this comment: #21072 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allisonwang-db does it have to be transformUp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This case is tricky. Let's consider this example: df.sortBy('a.asc).sortBy('a.asc).sortBy('a.asc)
Sort(a, false, _) // A
Sort(a, false, _) // B
Sort(a, false, _) // C
...
If we don't use transformUp
here, the rule will keep both Sort B and C since it will not apply the rule to A's child (B) again. Plan after running this rule will be:
Sort(a, false, _) // B
Sort(a, false, _) // C
...
And the Once
strategy's idempotency will be broken since Sort B will be removed if we run this rule again.
Ah yes, It seems we wouln't have to change the |
Test build #130056 has finished for PR 30093 at commit
|
Test build #130057 has finished for PR 30093 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130172 has finished for PR 30093 at commit
|
} | ||
|
||
test("remove redundant sorts with sort merge join") { | ||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use join hint to specify join type? it's more robust than tuning configs, especially when testing broadcast join.
val innerJoinDesc1 = queryTemplate.format("JOIN", "t1.key DESC") | ||
checkSorts(innerJoinDesc1, 1, 1) | ||
|
||
val leftOuterJoinDesc = queryTemplate.format("LEFT JOIN", "t1.key DESC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to use LEFT JOIN if we use join hint to specify which side to broadcast.
Test build #130219 has finished for PR 30093 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally | ||
|
||
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private val
@@ -1242,6 +1242,13 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this config? cc: @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to backport this, it's safer to have a config for the new rule, in case it has bugs.
* if the child satisfies the required distribution so that it is safe to remove not only a | ||
* local sort but also a global sort when its child already satisfies required sort orders. | ||
*/ | ||
case class RemoveRedundantSorts(conf: SQLConf) extends Rule[SparkPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think we should not backport this to 2.4. This is a new rule and looks like an improvement.
We should only backport the fix of EliminateSorts
to 2.4.
But as it is late and this PR is close to merge and we have a config for it. I'm okay if others think it is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only backport the fix of EliminateSorts to 2.4.
But, the partial backport can cause performance regressin in 2.4 ? (global sort -> global sort case) #30093 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, however, this is inevitable for such bug fix and correctness bug fix precedes performance. Since the physical rule is well separated and has a config, it looks okay.
59f9cd4
to
cdc7dbe
Compare
The current change is out of the PR title and description. Before merging this, can you update the title and description? Thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #130335 has finished for PR 30093 at commit
|
thanks, merging to master! |
I'm okay to backport it with new rule as it has config. |
@allisonwang-db can you help to create the backport PR? we need to change the config "since version" accordingly. |
+1 to backport this. (I think it'd be better to avoid the performance regression of existing user's queries as much as possible.) |
…al rule to remove redundant sorts This PR aims to fix a correctness bug in the optimizer rule `EliminateSorts`. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix. A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts. ``` Sort(orders, global = True, ...) Sort(orders, global = False, ...) ``` Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered. Yes Unit tests Closes apache#30093 from allisonwang-db/fix-sort. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 9fb4536) Signed-off-by: allisonwang-db <[email protected]>
…al rule to remove redundant sorts This PR aims to fix a correctness bug in the optimizer rule `EliminateSorts`. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix. A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts. ``` Sort(orders, global = True, ...) Sort(orders, global = False, ...) ``` Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered. Yes Unit tests Closes apache#30093 from allisonwang-db/fix-sort. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 9fb4536) Signed-off-by: allisonwang-db <[email protected]>
I will update the master branch config version to 2.4.8 once the backport PRs are merged. |
…hysical rule to remove redundant sorts Backport #30093 for branch-2.4. ### What changes were proposed in this pull request? This PR aims to fix a correctness bug in the optimizer rule EliminateSorts. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix. ### Why are the changes needed? A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts. ``` Sort(orders, global = True, ...) Sort(orders, global = False, ...) ``` Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered. ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Unit tests Closes #30194 from allisonwang-db/SPARK-33183-branch-2.4. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…hysical rule to remove redundant sorts Backport #30093 for branch-3.0. I've updated the configuration version to 2.4.8. ### What changes were proposed in this pull request? This PR aims to fix a correctness bug in the optimizer rule EliminateSorts. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix. ### Why are the changes needed? A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts. ``` Sort(orders, global = True, ...) Sort(orders, global = False, ...) ``` Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered. ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Unit tests Closes #30195 from allisonwang-db/SPARK-33183-branch-3.0. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
… version ### What changes were proposed in this pull request? This PR is a follow up for #30093 to updates the config `spark.sql.execution.removeRedundantSorts` version to 2.4.8. ### Why are the changes needed? To update the rule version it has been backported to 2.4. #30194 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #30420 from allisonwang-db/spark-33183-follow-up. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR aims to fix a correctness bug in the optimizer rule
EliminateSorts
. It also adds a new physical rule to remove redundant sorts that cannot be eliminated in the Optimizer rule after the bugfix.Why are the changes needed?
A global sort should not be eliminated even if its child is ordered since we don't know if its child ordering is global or local. For example, in the following scenario, the first sort shouldn't be removed because it has a stronger guarantee than the second sort even if the sort orders are the same for both sorts.
Since there is no straightforward way to identify whether a node's output ordering is local or global, we should not remove a global sort even if its child is already ordered.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Unit tests