diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 5d4309357895b..aec1c93072410 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} -object ExplainUtils { +object ExplainUtils extends AdaptiveSparkPlanHelper { /** * Given a input physical plan, performs the following tasks. * 1. Computes the operator id for current operator and records it in the operaror @@ -144,15 +145,26 @@ object ExplainUtils { case p: WholeStageCodegenExec => case p: InputAdapter => case other: QueryPlan[_] => - if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) { + + def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) { currentOperationID += 1 other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID) operatorIDs += ((currentOperationID, other)) } - other.innerChildren.foreach { plan => - currentOperationID = generateOperatorIDs(plan, - currentOperationID, - operatorIDs) + + other match { + case p: AdaptiveSparkPlanExec => + currentOperationID = + generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs) + setOpId() + case p: QueryStageExec => + currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs) + setOpId() + case _ => + setOpId() + other.innerChildren.foldLeft(currentOperationID) { + (curId, plan) => generateOperatorIDs(plan, curId, operatorIDs) + } } } currentOperationID @@ -163,21 +175,25 @@ object ExplainUtils { * whole stage code gen id in the plan via setting a tag. */ private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = { + var currentCodegenId = -1 + + def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + if (currentCodegenId != -1) { + p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) + } + children.foreach(generateWholeStageCodegenIds) + } + // Skip the subqueries as they are not printed as part of main query block. if (plan.isInstanceOf[BaseSubqueryExec]) { return } - var currentCodegenId = -1 plan.foreach { case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId case _: InputAdapter => currentCodegenId = -1 - case other: QueryPlan[_] => - if (currentCodegenId != -1) { - other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) - } - other.innerChildren.foreach { plan => - generateWholeStageCodegenIds(plan) - } + case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan)) + case p: QueryStageExec => setCodegenId(p, Seq(p.plan)) + case other: QueryPlan[_] => setCodegenId(other, other.innerChildren) } } @@ -232,13 +248,16 @@ object ExplainUtils { } def removeTags(plan: QueryPlan[_]): Unit = { + def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = { + p.unsetTagValue(QueryPlan.OP_ID_TAG) + p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) + children.foreach(removeTags) + } + plan foreach { - case plan: QueryPlan[_] => - plan.unsetTagValue(QueryPlan.OP_ID_TAG) - plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG) - plan.innerChildren.foreach { p => - removeTags(p) - } + case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan)) + case p: QueryStageExec => remove(p, Seq(p.plan)) + case plan: QueryPlan[_] => remove(plan, plan.innerChildren) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 1e86e9cb77bf5..54d32c3f93a06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec( getFinalPhysicalPlan().execute() } - override def verboseString(maxFields: Int): String = simpleString(maxFields) - - override def simpleString(maxFields: Int): String = - s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)" + protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") override def generateTreeString( depth: Int, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index ea586f0bb2561..754225dd3fe95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan( if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(p) verifyAdaptivePlan(executedPlan, p) - val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan) + val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan) subqueryMap.put(exprId.id, subquery) case expressions.InSubquery(_, ListQuery(query, _, exprId, _)) if !subqueryMap.contains(exprId.id) => diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql new file mode 100644 index 0000000000000..f4afa2b77a9d7 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql @@ -0,0 +1,3 @@ +--IMPORT explain.sql + +--SET spark.sql.adaptive.enabled=true diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql b/sql/core/src/test/resources/sql-tests/inputs/explain.sql index 497b61c6134a2..80bf258704c70 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql @@ -117,3 +117,4 @@ EXPLAIN FORMATTED DROP TABLE explain_temp1; DROP TABLE explain_temp2; DROP TABLE explain_temp3; +DROP TABLE explain_temp4; diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out new file mode 100644 index 0000000000000..e1d4fa84c8f54 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -0,0 +1,860 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 23 + + +-- !query +CREATE table explain_temp1 (key int, val int) USING PARQUET +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE table explain_temp2 (key int, val int) USING PARQUET +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE table explain_temp3 (key int, val int) USING PARQUET +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE table explain_temp4 (key int, val string) USING PARQUET +-- !query schema +struct<> +-- !query output + + + +-- !query +SET spark.sql.codegen.wholeStage = true +-- !query schema +struct +-- !query output +spark.sql.codegen.wholeStage true + + +-- !query +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + ORDER BY key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (9) ++- Sort (8) + +- Exchange (7) + +- HashAggregate (6) + +- Exchange (5) + +- HashAggregate (4) + +- Project (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,0)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) HashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] + +(5) Exchange +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(6) HashAggregate +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] + +(7) Exchange +Input [2]: [key#x, max(val)#x] +Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] + +(8) Sort +Input [2]: [key#x, max(val)#x] +Arguments: [key#x ASC NULLS FIRST], true, 0 + +(9) AdaptiveSparkPlan +Output [2]: [key#x, max(val)#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 0 + GROUP BY key + HAVING max(val) > 0 +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (9) ++- Project (8) + +- Filter (7) + +- HashAggregate (6) + +- Exchange (5) + +- HashAggregate (4) + +- Project (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,0)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) HashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] + +(5) Exchange +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(6) HashAggregate +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] + +(7) Filter +Input [3]: [key#x, max(val)#x, max(val#x)#x] +Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) + +(8) Project +Output [2]: [key#x, max(val)#x] +Input [3]: [key#x, max(val)#x, max(val#x)#x] + +(9) AdaptiveSparkPlan +Output [2]: [key#x, max(val)#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT key, val FROM explain_temp1 WHERE key > 0 + UNION + SELECT key, val FROM explain_temp1 WHERE key > 0 +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (11) ++- HashAggregate (10) + +- Exchange (9) + +- HashAggregate (8) + +- Union (7) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp1 (4) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,0)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,0)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 0)) + +(6) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(7) Union + +(8) HashAggregate +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] +Functions: [] +Aggregate Attributes: [] +Results [2]: [key#x, val#x] + +(9) Exchange +Input [2]: [key#x, val#x] +Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] + +(10) HashAggregate +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] +Functions: [] +Aggregate Attributes: [] +Results [2]: [key#x, val#x] + +(11) AdaptiveSparkPlan +Output [2]: [key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a, + explain_temp2 b + WHERE a.key = b.key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (9) ++- BroadcastHashJoin Inner BuildRight (8) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp2 (4) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : isnotnull(key#x) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(key)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : isnotnull(key#x) + +(6) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(7) BroadcastExchange +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + +(8) BroadcastHashJoin +Left keys [1]: [key#x] +Right keys [1]: [key#x] +Join condition: None + +(9) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 a + LEFT OUTER JOIN explain_temp2 b + ON a.key = b.key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (7) ++- BroadcastHashJoin LeftOuter BuildRight (6) + :- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (5) + +- Project (4) + +- Filter (3) + +- Scan parquet default.explain_temp2 (2) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(2) Scan parquet default.explain_temp2 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp2] +PushedFilters: [IsNotNull(key)] +ReadSchema: struct + +(3) Filter +Input [2]: [key#x, val#x] +Condition : isnotnull(key#x) + +(4) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(5) BroadcastExchange +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + +(6) BroadcastHashJoin +Left keys [1]: [key#x] +Right keys [1]: [key#x] +Join condition: None + +(7) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE key = (SELECT max(key) + FROM explain_temp3 + WHERE val > 0) + AND val = 2) + AND val > 3 +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (4) ++- Project (3) + +- Filter (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x > 3)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) AdaptiveSparkPlan +Output [2]: [key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT * + FROM explain_temp1 + WHERE key = (SELECT max(key) + FROM explain_temp2 + WHERE val > 0) + OR + key = (SELECT avg(key) + FROM explain_temp3 + WHERE val > 0) +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (3) ++- Filter (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery subquery#x, [id=#x])) + +(3) AdaptiveSparkPlan +Output [2]: [key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT (SELECT Avg(key) FROM explain_temp1) + (SELECT Avg(key) FROM explain_temp1) + FROM explain_temp1 +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (3) ++- Project (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output: [] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct<> + +(2) Project +Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] +Input: [] + +(3) AdaptiveSparkPlan +Output [1]: [(scalarsubquery() + scalarsubquery())#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT * + FROM explain_temp1 + WHERE key > 10 + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (9) ++- BroadcastHashJoin Inner BuildRight (8) + :- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (7) + +- Project (6) + +- Filter (5) + +- Scan parquet default.explain_temp1 (4) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,10)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,10)] +ReadSchema: struct + +(5) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(6) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(7) BroadcastExchange +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + +(8) BroadcastHashJoin +Left keys [1]: [key#x] +Right keys [1]: [key#x] +Join condition: None + +(9) AdaptiveSparkPlan +Output [4]: [key#x, val#x, key#x, val#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + WITH cte1 AS ( + SELECT key, max(val) + FROM explain_temp1 + WHERE key > 10 + GROUP BY key + ) + SELECT * FROM cte1 a, cte1 b WHERE a.key = b.key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (15) ++- BroadcastHashJoin Inner BuildRight (14) + :- HashAggregate (6) + : +- Exchange (5) + : +- HashAggregate (4) + : +- Project (3) + : +- Filter (2) + : +- Scan parquet default.explain_temp1 (1) + +- BroadcastExchange (13) + +- HashAggregate (12) + +- Exchange (11) + +- HashAggregate (10) + +- Project (9) + +- Filter (8) + +- Scan parquet default.explain_temp1 (7) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,10)] +ReadSchema: struct + +(2) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(3) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(4) HashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] + +(5) Exchange +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(6) HashAggregate +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] + +(7) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +PushedFilters: [IsNotNull(key), GreaterThan(key,10)] +ReadSchema: struct + +(8) Filter +Input [2]: [key#x, val#x] +Condition : (isnotnull(key#x) AND (key#x > 10)) + +(9) Project +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] + +(10) HashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] + +(11) Exchange +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(12) HashAggregate +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] + +(13) BroadcastExchange +Input [2]: [key#x, max(val)#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] + +(14) BroadcastHashJoin +Left keys [1]: [key#x] +Right keys [1]: [key#x] +Join condition: None + +(15) AdaptiveSparkPlan +Output [4]: [key#x, max(val)#x, key#x, max(val)#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + CREATE VIEW explain_view AS + SELECT key, val FROM explain_temp1 +-- !query schema +struct +-- !query output +== Physical Plan == +Execute CreateViewCommand (1) + +- CreateViewCommand (2) + +- Project (4) + +- UnresolvedRelation (3) + + +(1) Execute CreateViewCommand +Output: [] + +(2) CreateViewCommand +Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView + +(3) UnresolvedRelation +Arguments: [explain_temp1] + +(4) Project +Arguments: ['key, 'val] + + +-- !query +EXPLAIN FORMATTED + SELECT + COUNT(val) + SUM(key) as TOTAL, + COUNT(key) FILTER (WHERE val > 1) + FROM explain_temp1 +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (5) ++- HashAggregate (4) + +- Exchange (3) + +- HashAggregate (2) + +- Scan parquet default.explain_temp1 (1) + + +(1) Scan parquet default.explain_temp1 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp1] +ReadSchema: struct + +(2) HashAggregate +Input [2]: [key#x, val#x] +Keys: [] +Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] +Aggregate Attributes [3]: [count#xL, sum#xL, count#xL] +Results [3]: [count#xL, sum#xL, count#xL] + +(3) Exchange +Input [3]: [count#xL, sum#xL, count#xL] +Arguments: SinglePartition, true, [id=#x] + +(4) HashAggregate +Input [3]: [count#xL, sum#xL, count#xL] +Keys: [] +Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] +Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] +Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] + +(5) AdaptiveSparkPlan +Output [2]: [TOTAL#xL, count(key) FILTER (WHERE (val > 1))#xL] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT key, sort_array(collect_set(val))[0] + FROM explain_temp4 + GROUP BY key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (5) ++- ObjectHashAggregate (4) + +- Exchange (3) + +- ObjectHashAggregate (2) + +- Scan parquet default.explain_temp4 (1) + + +(1) Scan parquet default.explain_temp4 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp4] +ReadSchema: struct + +(2) ObjectHashAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [buf#x] +Results [2]: [key#x, buf#x] + +(3) Exchange +Input [2]: [key#x, buf#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(4) ObjectHashAggregate +Input [2]: [key#x, buf#x] +Keys [1]: [key#x] +Functions [1]: [collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x] +Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] + +(5) AdaptiveSparkPlan +Output [2]: [key#x, sort_array(collect_set(val), true)[0]#x] +Arguments: isFinalPlan=false + + +-- !query +EXPLAIN FORMATTED + SELECT key, MIN(val) + FROM explain_temp4 + GROUP BY key +-- !query schema +struct +-- !query output +== Physical Plan == +AdaptiveSparkPlan (7) ++- SortAggregate (6) + +- Sort (5) + +- Exchange (4) + +- SortAggregate (3) + +- Sort (2) + +- Scan parquet default.explain_temp4 (1) + + +(1) Scan parquet default.explain_temp4 +Output [2]: [key#x, val#x] +Batched: true +Location [not included in comparison]/{warehouse_dir}/explain_temp4] +ReadSchema: struct + +(2) Sort +Input [2]: [key#x, val#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + +(3) SortAggregate +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_min(val#x)] +Aggregate Attributes [1]: [min#x] +Results [2]: [key#x, min#x] + +(4) Exchange +Input [2]: [key#x, min#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + +(5) Sort +Input [2]: [key#x, min#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + +(6) SortAggregate +Input [2]: [key#x, min#x] +Keys [1]: [key#x] +Functions [1]: [min(val#x)] +Aggregate Attributes [1]: [min(val#x)#x] +Results [2]: [key#x, min(val#x)#x AS min(val)#x] + +(7) AdaptiveSparkPlan +Output [2]: [key#x, min(val)#x] +Arguments: isFinalPlan=false + + +-- !query +DROP TABLE explain_temp1 +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE explain_temp2 +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE explain_temp3 +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE explain_temp4 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 06226f1274863..1a18d563588c0 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 22 +-- Number of queries: 23 -- !query @@ -1053,3 +1053,11 @@ DROP TABLE explain_temp3 struct<> -- !query output + + +-- !query +DROP TABLE explain_temp4 +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index b2047091b1576..a1b6d71fb3803 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite +import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType -class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiveExecutionSuite { - import testImplicits._ +trait ExplainSuiteHelper extends QueryTest with SharedSparkSession { - private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = { + protected def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { df.explain(mode.name) @@ -38,7 +37,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv /** * Get the explain from a DataFrame and run the specified action on it. */ - private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = { + protected def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f: String => Unit) = { f(getNormalizedExplain(df, mode)) } @@ -46,7 +45,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv * Get the explain by running the sql. The explain mode should be part of the * sql text itself. */ - private def withNormalizedExplain(queryText: String)(f: String => Unit) = { + protected def withNormalizedExplain(queryText: String)(f: String => Unit) = { val output = new java.io.ByteArrayOutputStream() Console.withOut(output) { sql(queryText).show(false) @@ -58,7 +57,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv /** * Runs the plan and makes sure the plans contains all of the keywords. */ - private def checkKeywordsExistsInExplain( + protected def checkKeywordsExistsInExplain( df: DataFrame, mode: ExplainMode, keywords: String*): Unit = { withNormalizedExplain(df, mode) { normalizedOutput => for (key <- keywords) { @@ -67,9 +66,13 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv } } - private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { + protected def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { checkKeywordsExistsInExplain(df, ExtendedMode, keywords: _*) } +} + +class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite { + import testImplicits._ test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") { val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd") @@ -342,4 +345,57 @@ class ExplainSuite extends QueryTest with SharedSparkSession with DisableAdaptiv } } +class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite { + import testImplicits._ + + test("Explain formatted") { + val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1") + val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2") + val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2")) + // trigger the final plan for AQE + testDf.collect() + // whitespace + val ws = " " + // == Physical Plan == + // AdaptiveSparkPlan (14) + // +- * HashAggregate (13) + // +- CustomShuffleReader (12) + // +- ShuffleQueryStage (11) + // +- Exchange (10) + // +- * HashAggregate (9) + // +- * Project (8) + // +- * BroadcastHashJoin Inner BuildRight (7) + // :- * Project (2) + // : +- * LocalTableScan (1) + // +- BroadcastQueryStage (6) + // +- BroadcastExchange (5) + // +- * Project (4) + // +- * LocalTableScan (3) + checkKeywordsExistsInExplain( + testDf, + FormattedMode, + s""" + |(6) BroadcastQueryStage$ws + |Output [2]: [k#x, v2#x] + |Arguments: 0 + |""".stripMargin, + s""" + |(11) ShuffleQueryStage$ws + |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] + |Arguments: 1 + |""".stripMargin, + s""" + |(12) CustomShuffleReader$ws + |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] + |Arguments: coalesced + |""".stripMargin, + s""" + |(14) AdaptiveSparkPlan$ws + |Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x] + |Arguments: isFinalPlan=true + |""".stripMargin + ) + } +} + case class ExplainSingleData(id: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 46d4278510c9b..028f9f38d6d3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -52,7 +52,7 @@ class AdaptiveQueryExecSuite event match { case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) => if (sparkPlanInfo.simpleString.startsWith( - "AdaptiveSparkPlan(isFinalPlan=true)")) { + "AdaptiveSparkPlan isFinalPlan=true")) { finalPlanCnt += 1 } case _ => // ignore other events @@ -63,14 +63,14 @@ class AdaptiveQueryExecSuite val dfAdaptive = sql(query) val planBefore = dfAdaptive.queryExecution.executedPlan - assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)")) + assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) val result = dfAdaptive.collect() withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = sql(query) QueryTest.sameRows(result.toSeq, df.collect().toSeq) } val planAfter = dfAdaptive.queryExecution.executedPlan - assert(planAfter.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=true)")) + assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) spark.sparkContext.listenerBus.waitUntilEmpty() assert(finalPlanCnt == 1)