Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34701][SQL] Remove analyzing temp view again in CreateViewCommand #31933

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,18 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case _ => // Analysis successful!
}
}

// Check analysis on internal nodes.
plan match {
case c: Command =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move it to L682?

c.innerChildren.foreach {
case l: LogicalPlan => checkAnalysis(l)
case _ =>
}

case _ => // Analysis successful!
}

checkCollectedMetrics(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3370,7 +3370,7 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
analyzedPlan = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param analyzedPlan the logical plan that represents the view; this is used to generate the
* logical plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
Expand All @@ -62,15 +62,15 @@ case class CreateViewCommand(
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
analyzedPlan: LogicalPlan,
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType)
extends RunnableCommand {

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def innerChildren: Seq[QueryPlan[_]] = Seq(analyzedPlan)

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand All @@ -96,11 +96,6 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a regression. CreateViewCommand can be created from CreateViewStatement in ResolveSessionCatalog. However, ResolveSessionCatalog is inside extendedResolutionRules, which means the analyzedPlan doesn't go through postHocResolutionRules.

I think we need to make CreateViewCommand.analyzedPlan a real child, so that it can go through all the analyzer rules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. One issue with "make CreateViewCommand.analyzedPlan a real child" is that it will become an optimized plan, which will affect the caching. I can try to make certain command's children to skip optimizer. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be great to have such a mechanism to skip optimizer, useful to CacheTableAsSelect as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will give it a shot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I created #32032 to handle this scenario. Please let me know what you think. TIA!


if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,21 @@ case class CacheTableAsSelectExec(
override lazy val relationName: String = tempViewName

override lazy val planToCache: LogicalPlan = {
// CacheTableAsSelectExec.query is not resolved yet (e.g., not a child of CacheTableAsSelect)
// in order to skip optimizing it; note that we need to pass an analyzed plan to
// CreateViewCommand for the cache to work correctly. Thus, the query is analyzed below.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we further clean this up by creating CreateViewStatement instead of CreateViewCommand below?

val qe = sparkSession.sessionState.executePlan(query)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
imback82 marked this conversation as resolved.
Show resolved Hide resolved

Dataset.ofRows(sparkSession,
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
analyzedPlan = analyzedPlan,
allowExisting = false,
replace = false,
viewType = LocalTempView
Expand Down