Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonwang-db committed Oct 19, 2020
1 parent ab0bad9 commit bf6e45a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,9 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
* Note that changes in the final output ordering may affect the file size (SPARK-32318).
* This rule handles the following cases:
* 1) if the sort order is empty or the sort order does not have any reference
* 2) if the child is already sorted
* 2) if the Sort operator is a local sort and the child is already sorted, or
* the Sort operator is a global sort with the child being another global Sort operator or
* a Range operator that satisfies the parent sort orders.
* 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or
* RepartitionByExpression (with deterministic expressions) operators
* 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or
Expand All @@ -1056,8 +1058,14 @@ object EliminateSorts extends Rule[LogicalPlan] {
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
val newOrders = orders.filterNot(_.child.foldable)
if (newOrders.isEmpty) child else s.copy(order = newOrders)
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
case s @ Sort(orders, global, child)
if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
(global, child) match {
case (false, _) => child
case (true, r: Range) => r
case (true, s @ Sort(_, true, _)) => s
case (true, _) => s.copy(child = recursiveRemoveSort(child))
}
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,22 @@ class EliminateSortsSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("remove redundant order by") {
test("remove redundant local sort") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(unnecessaryReordered.analyze)
val correctAnswer = orderedPlan.limit(2).select('a).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("should not remove global sort") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val reordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(reordered.analyze)
val correctAnswer = reordered.analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}

test("do not remove sort if the order is different") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val reorderedDifferently = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc)
Expand All @@ -113,22 +121,39 @@ class EliminateSortsSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("filters don't affect order") {
test("filters don't affect order for local sort") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.where('a > Literal(10)).sortBy('a.asc, 'b.desc)
val optimized = Optimize.execute(filteredAndReordered.analyze)
val correctAnswer = orderedPlan.where('a > Literal(10)).analyze
comparePlans(optimized, correctAnswer)
}

test("limits don't affect order") {
test("should keep global sort when child is a filter operator with the same ordering") {
val projectPlan = testRelation.select('a, 'b)
val orderedPlan = projectPlan.orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(filteredAndReordered.analyze)
val correctAnswer = projectPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc).analyze
comparePlans(optimized, correctAnswer)
}

test("limits don't affect order for local sort") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc)
val optimized = Optimize.execute(filteredAndReordered.analyze)
val correctAnswer = orderedPlan.limit(Literal(10)).analyze
comparePlans(optimized, correctAnswer)
}

test("should keep global sort when child is a limit operator with the same ordering") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(filteredAndReordered.analyze)
val correctAnswer = filteredAndReordered.analyze
comparePlans(optimized, correctAnswer)
}

test("different sorts are not simplified if limit is in between") {
val orderedPlan = testRelation.select('a, 'b).orderBy('b.desc).limit(Literal(10))
.orderBy('a.asc)
Expand Down Expand Up @@ -331,4 +356,26 @@ class EliminateSortsSuite extends PlanTest {
val correctAnswer = PushDownOptimizer.execute(noOrderByPlan.analyze)
comparePlans(optimized, correctAnswer)
}

test("remove two consecutive global sorts with same ordering") {
Seq(
(testRelation.orderBy('a.asc).orderBy('a.asc), testRelation.orderBy('a.asc)),
(testRelation.orderBy('a.asc, 'b.desc).orderBy('a.asc),
testRelation.orderBy('a.asc, 'b.desc))
).foreach { case (ordered, answer) =>
val optimized = Optimize.execute(ordered.analyze)
comparePlans(optimized, answer.analyze)
}
}

test("should keep global sort when child is a local sort with the same ordering") {
val correctAnswer = testRelation.orderBy('a.asc).analyze
Seq(
testRelation.sortBy('a.asc).orderBy('a.asc),
testRelation.orderBy('a.asc).sortBy('a.asc).orderBy('a.asc)
).foreach { ordered =>
val optimized = Optimize.execute(ordered.analyze)
comparePlans(optimized, correctAnswer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,6 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
}

test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
val query = testData.select('key, 'value).sort('key.desc).cache()
assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])
val resorted = query.sort('key.desc)
assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty)
assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
(1 to 100).reverse)
// with a different order, the sort is needed
val sortedAsc = query.sort('key)
assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.size == 1)
assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100))
}

test("PartitioningCollection") {
withTempView("normal", "small", "tiny") {
testData.createOrReplaceTempView("normal")
Expand Down

0 comments on commit bf6e45a

Please sign in to comment.