Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35133][SQL] Explain codegen works with AQE #32430

Closed
wants to merge 5 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented May 4, 2021

What changes were proposed in this pull request?

EXPLAIN CODEGEN <query> (and Dataset.explain("codegen")) prints out the generated code for each stage of plan. The current implementation is to match WholeStageCodegenExec operator in query plan and prints out generated code (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala#L111-L118 ). This does not work with AQE as we wrap the whole query plan inside AdaptiveSparkPlanExec and do not run whole stage code-gen physical plan rule eagerly (CollapseCodegenStages). This introduces unexpected behavior change for EXPLAIN query (and Dataset.explain), as we enable AQE by default now.

The change is to explain code-gen for the current executed plan of AQE.

Why are the changes needed?

Make EXPLAIN CODEGEN work same as before.

Does this PR introduce any user-facing change?

No (when comparing with latest Spark release 3.1.1).

How was this patch tested?

Added unit test in ExplainSuite.scala.

@github-actions github-actions bot added the SQL label May 4, 2021
@c21
Copy link
Contributor Author

c21 commented May 4, 2021

cc @cloud-fan if you have time to take a look, thanks.

@@ -197,7 +198,14 @@ class QueryExecution(
queryExecution.toString(maxFields, append)
case CodegenMode =>
try {
org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)
queryExecution.executedPlan match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about handling the AQE node in the writeCodegen side instead? That way, debugCodegen can support it, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated, thanks.

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42669/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42669/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Test build #138148 has finished for PR 32430 at commit 2b7b6da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42688/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42688/

@SparkQA
Copy link

SparkQA commented May 5, 2021

Test build #138167 has finished for PR 32430 at commit 788e714.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine otherwise.

case p: AdaptiveSparkPlanExec =>
// Find subtrees from original input plan of AQE.
val inputExecutedPlan = QueryExecution.prepareForExecution(
QueryExecution.preparations(sparkSession, None), p.inputPlan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use SparkSession.getActiveSession to avoid the unnecessary changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - good call, it's much cleaner. Updated.

val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()

def findSubtrees(plan: SparkPlan): Unit = {
plan foreach {
case s: WholeStageCodegenExec =>
codegenSubtrees += s
case p: AdaptiveSparkPlanExec =>
// Find subtrees from original input plan of AQE.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match the actual plan. Why can't use AdaptiveSparkPlanExec.executedPlan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this doesn't match the actual plan for df.explain("codegen") if df is executed already. The problem is the final plan AdaptiveSparkPlanExec.executedPlan has ShuffleQueryStageExec to wrap the whole sub-plan under that shuffle.

Example:

spark.range(5).select(col("id").as("key"), col("id").as("value")).groupBy('key).agg(max('value))
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key#2L], functions=[max(value#3L)], output=[key#2L, max(value)#9L])
   +- CustomShuffleReader coalesced
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(key#2L, 5), ENSURE_REQUIREMENTS, [id=#28]
            +- *(1) HashAggregate(keys=[key#2L], functions=[partial_max(value#3L)], output=[key#2L, max#13L])
               +- *(1) Project [id#0L AS key#2L, id#0L AS value#3L]
                  +- *(1) Range (0, 5, step=1, splits=2)

The partial aggregate HashAggregate is wrapped inside ShuffleQueryStage, so cannot be pattern matched to do the explain. One way to workaround is to add pattern matching for ShuffleQueryStageExec as well. But anyway we need to re-run the preparation physical plan rules if AdaptiveSparkPlan.isFinalPlan=false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can strip QueryStageExec as well. Can we use executedPlan even if isFinalPlan=false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated.

case p: AdaptiveSparkPlanExec =>
// Find subtrees from current executed plan of AQE.
val executedPlan = p.executedPlan
if (executedPlan.find(_.isInstanceOf[WholeStageCodegenExec]).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I see what you mean. For isFinalPlan=false, we may have query stages in the query plan that do not run queryStageOptimizerRules yet, and have no WholeStageCodegenExec.

I think this is at least consistent with normal EXPLAIN. If the df is not run yet, you won't see the leading * in plan nodes that indicate whole-stage codegen.

One idea is to trigger AQE during EXPLAIN, so that we can display the final plan. But this won't work for SQL EXPLAIN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a bit more thinking about how to do EXPLAIN with AQE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - I feel SQL EXPLAIN is usable, at least for me and several developers in the team. It's kind of annoying that we need to disable AQE every time for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is to trigger AQE during EXPLAIN, so that we can display the final plan.

I think users and developers are expecting EXPLAIN to be a purely metadata operator, so it might be surprising or inefficient to execute actual query during EXPLAIN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some plan changes (such as join elimination) in AQE, I don't think SQL EXPLAIN is useful as you can never see the final plan. How do you expect SQL EXPLAIN to work for AQE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AQE is not changing every query. As an example I gave in unit test case in ExplainSuite.scala, for aggregation query, AQE should not affect the behavior for the generated code for aggregation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with changing the code to be findSubtrees(aqePlan.executedPlan), so the explain code-gen behavior is same as explain, and at least it fixes the case for data frame being executed already (is final plan).

I can just add the change for SQL EXPLAIN in our internal fork and discuss in the future. WDYT @cloud-fan ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. We can also look at how other systems do EXPLAIN with query plan that can change at runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated, I agree, will check as well, thanks.

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42717/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42717/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42722/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42722/

@SparkQA
Copy link

SparkQA commented May 6, 2021

Test build #138196 has finished for PR 32430 at commit f6a5b85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2021

Test build #138201 has finished for PR 32430 at commit 4683f0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -565,6 +565,33 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
|""".stripMargin
)
}

test("SPARK-35133: explain codegen should work with AQE") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @c21 .
Although SPARK-33679 enabled AQE by default for Apache Spark 3.2, we don't know what happens during QA and Apache Spark 3.2.0 RC period. In addition, we had better an explicit configuration in the test case. Could you add spark.sql.adaptive.enabled=true additionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - sorry if it's not clear, this test is under ExplainSuiteAE which extends EnableAdaptiveExecutionSuite to enforce AQE enabled already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it. Then, it's perfect.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.StructType

// Disable AQE because the WholeStageCodegenExec is added when running QueryStageExec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removal, shall we recover this for DebuggingSuite at line 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - sorry can you elaborate more by recovering? Here I kept the AQE-runnable unit tests under DebuggingSuiteBase, and have a non-AQE DebuggingSuite and AQE DebuggingSuiteAE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh do you mean move the comment to line 100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun - updated to move the comment to line 100, let me know if my understanding is not correct, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (with minor comments). Thank you, @c21 and all.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented May 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42749/

@SparkQA
Copy link

SparkQA commented May 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42749/

@dongjoon-hyun
Copy link
Member

It seems that there is some delay at GitHub Action. I checked that it's already passed.
Screen Shot 2021-05-06 at 8 42 37 PM

Thank you, @c21 and all. Merged to master.

@c21
Copy link
Contributor Author

c21 commented May 7, 2021

Thank you all for review!

@c21 c21 deleted the explain-aqe branch May 7, 2021 04:27
@SparkQA
Copy link

SparkQA commented May 7, 2021

Test build #138227 has finished for PR 32430 at commit fae6c5d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants