Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35133][SQL] Explain codegen works with AQE #32430

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFor
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
Expand Down Expand Up @@ -112,6 +113,11 @@ package object debug {
plan foreach {
case s: WholeStageCodegenExec =>
codegenSubtrees += s
case p: AdaptiveSparkPlanExec =>
// Find subtrees from current executed plan of AQE.
findSubtrees(p.executedPlan)
case s: QueryStageExec =>
findSubtrees(s.plan)
case s =>
s.subqueries.foreach(findSubtrees)
}
Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,33 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
|""".stripMargin
)
}

test("SPARK-35133: explain codegen should work with AQE") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @c21 .
Although SPARK-33679 enabled AQE by default for Apache Spark 3.2, we don't know what happens during QA and Apache Spark 3.2.0 RC period. In addition, we had better an explicit configuration in the test case. Could you add spark.sql.adaptive.enabled=true additionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - sorry if it's not clear, this test is under ExplainSuiteAE which extends EnableAdaptiveExecutionSuite to enforce AQE enabled already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it. Then, it's perfect.

withTempView("df") {
val df = spark.range(5).select(col("id").as("key"), col("id").as("value"))
df.createTempView("df")

val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key"
val expectedCodegenText = "Found 2 WholeStageCodegen subtrees."
val expectedNoCodegenText = "Found 0 WholeStageCodegen subtrees."
withNormalizedExplain(sqlText) { normalizedOutput =>
assert(normalizedOutput.contains(expectedNoCodegenText))
}

val aggDf = df.groupBy('key).agg(max('value))
withNormalizedExplain(aggDf, CodegenMode) { normalizedOutput =>
assert(normalizedOutput.contains(expectedNoCodegenText))
}

// trigger the final plan for AQE
aggDf.collect()
withNormalizedExplain(aggDf, CodegenMode) { normalizedOutput =>
assert(normalizedOutput.contains(expectedCodegenText))
}
}
}
}
}

case class ExplainSingleData(id: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.StructType

// Disable AQE because the WholeStageCodegenExec is added when running QueryStageExec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removal, shall we recover this for DebuggingSuite at line 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - sorry can you elaborate more by recovering? Here I kept the AQE-runnable unit tests under DebuggingSuiteBase, and have a non-AQE DebuggingSuite and AQE DebuggingSuiteAE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh do you mean move the comment to line 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - updated to move the comment to line 100, let me know if my understanding is not correct, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

class DebuggingSuite extends SharedSparkSession with DisableAdaptiveExecutionSuite {
abstract class DebuggingSuiteBase extends SharedSparkSession {

test("DataFrame.debug()") {
testData.debug()
Expand All @@ -43,63 +42,23 @@ class DebuggingSuite extends SharedSparkSession with DisableAdaptiveExecutionSui
}

test("debugCodegen") {
val res = codegenString(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenString(df.queryExecution.executedPlan)
assert(res.contains("Subtree 1 / 2"))
assert(res.contains("Subtree 2 / 2"))
assert(res.contains("Object[]"))
}

test("debugCodegenStringSeq") {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenStringSeq(df.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall{ case (subtree, code, _) =>
subtree.contains("Range") && code.contains("Object[]")})
}

test("SPARK-28537: DebugExec cannot debug broadcast related queries") {
val rightDF = spark.range(10)
val leftDF = spark.range(10)
val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id"))

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
joinedDF.debug()
}

val output = captured.toString()
val hashedModeString = "HashedRelationBroadcastMode(List(input[0, bigint, false]),false)"
assert(output.replaceAll("\\[id=#\\d+\\]", "[id=#x]").contains(
s"""== BroadcastExchange $hashedModeString, [id=#x] ==
|Tuples output: 0
| id LongType: {}
|== WholeStageCodegen (1) ==
|Tuples output: 10
| id LongType: {java.lang.Long}
|== Range (0, 10, step=1, splits=2) ==
|Tuples output: 0
| id LongType: {}""".stripMargin))
}

test("SPARK-28537: DebugExec cannot debug columnar related queries") {
val df = spark.range(5)
df.persist()

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
df.debug()
}
df.unpersist()

val output = captured.toString().replaceAll("#\\d+", "#x")
assert(output.contains(
"""== InMemoryTableScan [id#xL] ==
|Tuples output: 0
| id LongType: {}
|""".stripMargin))
}

case class DummyCodeGeneratorPlan(useInnerClass: Boolean)
extends CodegenSupport with LeafExecNode {
override def output: Seq[Attribute] = StructType.fromDDL("d int").toAttributes
Expand Down Expand Up @@ -137,3 +96,50 @@ class DebuggingSuite extends SharedSparkSession with DisableAdaptiveExecutionSui
}
}
}

class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSuite {

test("SPARK-28537: DebugExec cannot debug broadcast related queries") {
val rightDF = spark.range(10)
val leftDF = spark.range(10)
val joinedDF = leftDF.join(rightDF, leftDF("id") === rightDF("id"))

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
joinedDF.debug()
}

val output = captured.toString()
val hashedModeString = "HashedRelationBroadcastMode(List(input[0, bigint, false]),false)"
assert(output.replaceAll("\\[id=#\\d+\\]", "[id=#x]").contains(
s"""== BroadcastExchange $hashedModeString, [id=#x] ==
|Tuples output: 0
| id LongType: {}
|== WholeStageCodegen (1) ==
|Tuples output: 10
| id LongType: {java.lang.Long}
|== Range (0, 10, step=1, splits=2) ==
|Tuples output: 0
| id LongType: {}""".stripMargin))
}

test("SPARK-28537: DebugExec cannot debug columnar related queries") {
val df = spark.range(5)
df.persist()

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
df.debug()
}
df.unpersist()

val output = captured.toString().replaceAll("#\\d+", "#x")
assert(output.contains(
"""== InMemoryTableScan [id#xL] ==
|Tuples output: 0
| id LongType: {}
|""".stripMargin))
}
}

class DebuggingSuiteAE extends DebuggingSuiteBase with EnableAdaptiveExecutionSuite