Skip to content

Commit

Permalink
[SPARK-30651][SQL] Add detailed information for Aggregate operators i…
Browse files Browse the repository at this point in the history
…n EXPLAIN FORMATTED

### What changes were proposed in this pull request?
Currently `EXPLAIN FORMATTED` only report input attributes of HashAggregate/ObjectHashAggregate/SortAggregate, while `EXPLAIN EXTENDED` provides more information of Keys, Functions, etc. This PR enhanced `EXPLAIN FORMATTED` to sync with original explain behavior.

### Why are the changes needed?
The newly added `EXPLAIN FORMATTED` got less information comparing to the original `EXPLAIN EXTENDED`

### Does this PR introduce any user-facing change?
Yes, taking HashAggregate explain result as example.

**SQL**
```
EXPLAIN FORMATTED
  SELECT
    COUNT(val) + SUM(key) as TOTAL,
    COUNT(key) FILTER (WHERE val > 1)
  FROM explain_temp1;
```

**EXPLAIN EXTENDED**
```
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(val#6), sum(cast(key#5 as bigint)), count(key#5)], output=[TOTAL#62L, count(key) FILTER (WHERE (val > 1))#71L])
+- Exchange SinglePartition, true, [id=apache#89]
   +- HashAggregate(keys=[], functions=[partial_count(val#6), partial_sum(cast(key#5 as bigint)), partial_count(key#5) FILTER (WHERE (val#6 > 1))], output=[count#75L, sum#76L, count#77L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.explain_temp1[key#5,val#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/XXX/spark-dev/spark/spark-warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:int>
```

**EXPLAIN FORMATTED - BEFORE**
```
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)
   +- HashAggregate (3)
      +- * ColumnarToRow (2)
         +- Scan parquet default.explain_temp1 (1)

...
...
(5) HashAggregate [codegen id : 2]
Input: [count#91L, sum#92L, count#93L]
...
...
```

**EXPLAIN FORMATTED - AFTER**
```
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)
   +- HashAggregate (3)
      +- * ColumnarToRow (2)
         +- Scan parquet default.explain_temp1 (1)

...
...
(5) HashAggregate [codegen id : 2]
Input: [count#91L, sum#92L, count#93L]
Keys: []
Functions: [count(val#6), sum(cast(key#5 as bigint)), count(key#5)]
Results: [(count(val#6)#84L + sum(cast(key#5 as bigint))#85L) AS TOTAL#78L, count(key#5)#86L AS count(key) FILTER (WHERE (val > 1))#87L]
Output: [TOTAL#78L, count(key) FILTER (WHERE (val > 1))#87L]
...
...
```

### How was this patch tested?
Three tests added in explain.sql for HashAggregate/ObjectHashAggregate/SortAggregate.

Closes apache#27368 from Eric5553/ExplainFormattedAgg.

Authored-by: Eric Wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Eric5553 authored and Seongjin Cho committed Apr 14, 2020
1 parent 1a690bf commit 99a2e54
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode {
def groupingExpressions: Seq[NamedExpression]
def aggregateExpressions: Seq[AggregateExpression]
def aggregateAttributes: Seq[Attribute]
def resultExpressions: Seq[NamedExpression]

override def verboseStringWithOperatorId(): String = {
val inputString = child.output.mkString("[", ", ", "]")
val keyString = groupingExpressions.mkString("[", ", ", "]")
val functionString = aggregateExpressions.mkString("[", ", ", "]")
val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]")
val resultString = resultExpressions.mkString("[", ", ", "]")
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|Input: $inputString
|Keys: $keyString
|Functions: $functionString
|Aggregate Attributes: $aggregateAttributeString
|Results: $resultString
""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ case class HashAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning {
extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class ObjectHashAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode with AliasAwareOutputPartitioning {
extends BaseAggregateExec with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -38,7 +38,7 @@ case class SortAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode with AliasAwareOutputPartitioning {
extends BaseAggregateExec with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down
22 changes: 21 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
CREATE table explain_temp1 (key int, val int) USING PARQUET;
CREATE table explain_temp2 (key int, val int) USING PARQUET;
CREATE table explain_temp3 (key int, val int) USING PARQUET;
CREATE table explain_temp4 (key int, val string) USING PARQUET;

SET spark.sql.codegen.wholeStage = true;

Expand Down Expand Up @@ -61,7 +62,7 @@ EXPLAIN FORMATTED
FROM explain_temp2
WHERE val > 0)
OR
key = (SELECT max(key)
key = (SELECT avg(key)
FROM explain_temp3
WHERE val > 0);

Expand Down Expand Up @@ -93,6 +94,25 @@ EXPLAIN FORMATTED
CREATE VIEW explain_view AS
SELECT key, val FROM explain_temp1;

-- HashAggregate
EXPLAIN FORMATTED
SELECT
COUNT(val) + SUM(key) as TOTAL,
COUNT(key) FILTER (WHERE val > 1)
FROM explain_temp1;

-- ObjectHashAggregate
EXPLAIN FORMATTED
SELECT key, sort_array(collect_set(val))[0]
FROM explain_temp4
GROUP BY key;

-- SortAggregate
EXPLAIN FORMATTED
SELECT key, MIN(val)
FROM explain_temp4
GROUP BY key;

-- cleanup
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
Expand Down
Loading

0 comments on commit 99a2e54

Please sign in to comment.