Skip to content

Commit

Permalink
Change to only fix behavior for executed data frame
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed May 6, 2021
1 parent f6a5b85 commit 4683f0e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,7 @@ package object debug {
codegenSubtrees += s
case p: AdaptiveSparkPlanExec =>
// Find subtrees from current executed plan of AQE.
val executedPlan = p.executedPlan
if (executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]).isEmpty) {
// Apply preparation rules if whole stage code-gen rule is not applied yet.
val preparedPlan = QueryExecution.prepareForExecution(
QueryExecution.preparations(SparkSession.getActiveSession.get, None), executedPlan)
findSubtrees(preparedPlan)
} else {
findSubtrees(executedPlan)
}
findSubtrees(p.executedPlan)
case s: QueryStageExec =>
findSubtrees(s.plan)
case s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,21 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
df.createTempView("df")

val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key"
val expectedText = "Found 2 WholeStageCodegen subtrees."
val expectedCodegenText = "Found 2 WholeStageCodegen subtrees."
val expectedNoCodegenText = "Found 0 WholeStageCodegen subtrees."
withNormalizedExplain(sqlText) { normalizedOutput =>
assert(normalizedOutput.contains(expectedText))
assert(normalizedOutput.contains(expectedNoCodegenText))
}

val aggDf = df.groupBy('key).agg(max('value))
withNormalizedExplain(aggDf, CodegenMode) { normalizedOutput =>
assert(normalizedOutput.contains(expectedText))
assert(normalizedOutput.contains(expectedNoCodegenText))
}

// trigger the final plan for AQE
aggDf.collect()
withNormalizedExplain(aggDf, CodegenMode) { normalizedOutput =>
assert(normalizedOutput.contains(expectedText))
assert(normalizedOutput.contains(expectedCodegenText))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@ abstract class DebuggingSuiteBase extends SharedSparkSession {
}

test("debugCodegen") {
val res = codegenString(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenString(df.queryExecution.executedPlan)
assert(res.contains("Subtree 1 / 2"))
assert(res.contains("Subtree 2 / 2"))
assert(res.contains("Object[]"))
}

test("debugCodegenStringSeq") {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenStringSeq(df.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall{ case (subtree, code, _) =>
subtree.contains("Range") && code.contains("Object[]")})
Expand Down

0 comments on commit 4683f0e

Please sign in to comment.