Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31495][SQL] Support formatted explain for AQE #28271

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec}

object ExplainUtils {
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operaror
Expand Down Expand Up @@ -144,15 +145,26 @@ object ExplainUtils {
case p: WholeStageCodegenExec =>
case p: InputAdapter =>
case other: QueryPlan[_] =>
if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {

def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
operatorIDs += ((currentOperationID, other))
}
other.innerChildren.foreach { plan =>
currentOperationID = generateOperatorIDs(plan,
currentOperationID,
operatorIDs)

other match {
case p: AdaptiveSparkPlanExec =>
currentOperationID =
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
setOpId()
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
setOpId()
case _ =>
setOpId()
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
}
}
}
currentOperationID
Expand All @@ -163,21 +175,25 @@ object ExplainUtils {
* whole stage code gen id in the plan via setting a tag.
*/
private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
var currentCodegenId = -1

def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
if (currentCodegenId != -1) {
p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
}
children.foreach(generateWholeStageCodegenIds)
}

// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}
var currentCodegenId = -1
plan.foreach {
case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId
case _: InputAdapter => currentCodegenId = -1
case other: QueryPlan[_] =>
if (currentCodegenId != -1) {
other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
}
other.innerChildren.foreach { plan =>
generateWholeStageCodegenIds(plan)
}
case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan))
case p: QueryStageExec => setCodegenId(p, Seq(p.plan))
case other: QueryPlan[_] => setCodegenId(other, other.innerChildren)
}
}

Expand Down Expand Up @@ -232,13 +248,16 @@ object ExplainUtils {
}

def removeTags(plan: QueryPlan[_]): Unit = {
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
p.unsetTagValue(QueryPlan.OP_ID_TAG)
p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
children.foreach(removeTags)
}

plan foreach {
case plan: QueryPlan[_] =>
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
plan.innerChildren.foreach { p =>
removeTags(p)
}
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
case p: QueryStageExec => remove(p, Seq(p.plan))
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec(
getFinalPhysicalPlan().execute()
}

override def verboseString(maxFields: Int): String = simpleString(maxFields)

override def simpleString(maxFields: Int): String =
s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)"
protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

override def generateTreeString(
depth: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan(
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan)
val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan)
subqueryMap.put(exprId.id, subquery)
case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
if !subqueryMap.contains(exprId.id) =>
Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--IMPORT explain.sql

--SET spark.sql.adaptive.enabled=true
1 change: 1 addition & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,4 @@ EXPLAIN FORMATTED
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
DROP TABLE explain_temp3;
DROP TABLE explain_temp4;
Loading