Skip to content

Commit

Permalink
[SPARK-30651][SQL] Add detailed information for Aggregate operators i…
Browse files Browse the repository at this point in the history
…n EXPLAIN FORMATTED

Currently `EXPLAIN FORMATTED` only report input attributes of HashAggregate/ObjectHashAggregate/SortAggregate, while `EXPLAIN EXTENDED` provides more information of Keys, Functions, etc. This PR enhanced `EXPLAIN FORMATTED` to sync with original explain behavior.

The newly added `EXPLAIN FORMATTED` got less information comparing to the original `EXPLAIN EXTENDED`

Yes, taking HashAggregate explain result as example.

**SQL**
```
EXPLAIN FORMATTED
  SELECT
    COUNT(val) + SUM(key) as TOTAL,
    COUNT(key) FILTER (WHERE val > 1)
  FROM explain_temp1;
```

**EXPLAIN EXTENDED**
```
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(val#6), sum(cast(key#5 as bigint)), count(key#5)], output=[TOTAL#62L, count(key) FILTER (WHERE (val > 1))#71L])
+- Exchange SinglePartition, true, [id=#89]
   +- HashAggregate(keys=[], functions=[partial_count(val#6), partial_sum(cast(key#5 as bigint)), partial_count(key#5) FILTER (WHERE (val#6 > 1))], output=[count#75L, sum#76L, count#77L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.explain_temp1[key#5,val#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/XXX/spark-dev/spark/spark-warehouse/explain_temp1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,val:int>
```

**EXPLAIN FORMATTED - BEFORE**
```
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)
   +- HashAggregate (3)
      +- * ColumnarToRow (2)
         +- Scan parquet default.explain_temp1 (1)

...
...
(5) HashAggregate [codegen id : 2]
Input: [count#91L, sum#92L, count#93L]
...
...
```

**EXPLAIN FORMATTED - AFTER**
```
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)
   +- HashAggregate (3)
      +- * ColumnarToRow (2)
         +- Scan parquet default.explain_temp1 (1)

...
...
(5) HashAggregate [codegen id : 2]
Input: [count#91L, sum#92L, count#93L]
Keys: []
Functions: [count(val#6), sum(cast(key#5 as bigint)), count(key#5)]
Results: [(count(val#6)#84L + sum(cast(key#5 as bigint))#85L) AS TOTAL#78L, count(key#5)#86L AS count(key) FILTER (WHERE (val > 1))#87L]
Output: [TOTAL#78L, count(key) FILTER (WHERE (val > 1))#87L]
...
...
```

Three tests added in explain.sql for HashAggregate/ObjectHashAggregate/SortAggregate.

Closes apache#27368 from Eric5553/ExplainFormattedAgg.

Authored-by: Eric Wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Eric5553 authored and Nick Nicolini committed Jun 11, 2020
1 parent 1270813 commit 3c41f4b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode {
def groupingExpressions: Seq[NamedExpression]
def aggregateExpressions: Seq[AggregateExpression]
def aggregateAttributes: Seq[Attribute]
def resultExpressions: Seq[NamedExpression]

override def verboseStringWithOperatorId(): String = {
val inputString = child.output.mkString("[", ", ", "]")
val keyString = groupingExpressions.mkString("[", ", ", "]")
val functionString = aggregateExpressions.mkString("[", ", ", "]")
val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]")
val resultString = resultExpressions.mkString("[", ", ", "]")
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|Input: $inputString
|Keys: $keyString
|Functions: $functionString
|Aggregate Attributes: $aggregateAttributeString
|Results: $resultString
""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class HashAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode with BlockingOperatorWithCodegen {
extends BaseAggregateExec with BlockingOperatorWithCodegen {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class ObjectHashAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode {
extends BaseAggregateExec {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -38,7 +38,7 @@ case class SortAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode {
extends BaseAggregateExec {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down

0 comments on commit 3c41f4b

Please sign in to comment.