Skip to content

Commit

Permalink
[SPARK-24369][SQL] Correct handling for multiple distinct aggregation…
Browse files Browse the repository at this point in the history
…s having the same argument set

## What changes were proposed in this pull request?

bring back apache#21443

This is a different approach: just change the check to count distinct columns with `toSet`

## How was this patch tested?

a new test to verify the planner behavior.

Author: Wenchen Fan <[email protected]>
Author: Takeshi Yamamuro <[email protected]>

Closes apache#21487 from cloud-fan/back.
  • Loading branch information
cloud-fan authored and gatorsmile committed Jun 4, 2018
1 parent a2166ec commit 416cd1f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
Expand Down
6 changes: 5 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ SELECT 1 from (
FROM (select 1 as x) a
WHERE false
) b
where b.z != b.z
where b.z != b.z;

-- SPARK-24369 multiple distinct aggregations having the same argument set
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);
11 changes: 10 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 26
-- Number of queries: 27


-- !query 0
Expand Down Expand Up @@ -241,3 +241,12 @@ where b.z != b.z
struct<1:int>
-- !query 25 output



-- !query 26
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
-- !query 26 schema
struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
-- !query 26 output
1.0 1.0 3
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ class PlannerSuite extends SharedSQLContext {
testPartialAggregationPlan(query)
}

test("mixed aggregates with same distinct columns") {
def assertNoExpand(plan: SparkPlan): Unit = {
assert(plan.collect { case e: ExpandExec => e }.isEmpty)
}

withTempView("v") {
Seq((1, 1.0, 1.0), (1, 2.0, 2.0)).toDF("i", "j", "k").createTempView("v")
// one distinct column
val query1 = sql("SELECT sum(DISTINCT j), max(DISTINCT j) FROM v GROUP BY i")
assertNoExpand(query1.queryExecution.executedPlan)

// 2 distinct columns
val query2 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT j, k) FROM v GROUP BY i")
assertNoExpand(query2.queryExecution.executedPlan)

// 2 distinct columns with different order
val query3 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT k, j) FROM v GROUP BY i")
assertNoExpand(query3.queryExecution.executedPlan)
}
}

test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
def checkPlan(fieldTypes: Seq[DataType]): Unit = {
withTempView("testLimit") {
Expand Down

0 comments on commit 416cd1f

Please sign in to comment.