Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed. #32032

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
CleanupAliases),
Batch("TransformAfterAnalysis", Once,
TransformAfterAnalysis)
)

/**
Expand Down Expand Up @@ -3637,6 +3639,18 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

/**
* A rule that calls `TransformationAfterAnalysis.transform` for a plan that needs to be
* transformed after all the analysis rules are run.
*/
object TransformAfterAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case t: TransformationAfterAnalysis if t.resolved =>
checkAnalysis(t)
t.transform
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

/**
* A trait that exposes 'transform' which will be called by the `TransformAfterAnalysis` rule
* after all other analysis rules are called. One scenario to use this transformation is to
* remove any children of a logical plan so that they are not optimized; this is useful for
* commands that create a view or a cache because they need to work with analyzed plans.
*/
trait TransformationAfterAnalysis {
def transform: LogicalPlan
imback82 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,14 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String],
isPlanAnalyzed: Boolean = false) extends LeafCommand with TransformationAfterAnalysis {
// `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from
// children once the analysis phase is finished.
override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil

override def transform: LogicalPlan = copy(isPlanAnalyzed = true)
}

/**
* The logical plan of the UNCACHE TABLE command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}

trait LeafLike[T <: TreeNode[T]] { self: TreeNode[T] =>
override final def children: Seq[T] = Nil
override def children: Seq[T] = Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I am removing this final temporarily. If the approach of this PR is OK, I will add this back and refactor.

}

trait UnaryLike[T <: TreeNode[T]] { self: TreeNode[T] =>
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
plan = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
viewType = viewType,
isPlanAnalyzed = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) if child.resolved =>
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
originalText, child, allowExisting, replace, viewType) =>

val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
Expand All @@ -491,15 +491,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
comment,
properties,
originalText,
child,
allowExisting,
replace,
viewType)
name = v1TableName.asTableIdentifier,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
originalText = originalText,
plan = child,
allowExisting = allowExisting,
replace = replace,
viewType = viewType,
isPlanAnalyzed = false)

case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, TransformationAfterAnalysis, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand Down Expand Up @@ -62,15 +62,22 @@ case class CreateViewCommand(
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType)
extends RunnableCommand {
viewType: ViewType,
isPlanAnalyzed: Boolean)
extends RunnableCommand with TransformationAfterAnalysis {

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only include it as inner children only when isAnalyzed = true? Maybe we can put it in the base class, as it's better to include the plan in inner children to show it in the UI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan which UI are you referring to? If you are referring to the Spark UI, I see the following and it shows the same even if I use override def innerChildren: Seq[QueryPlan[_]] = if (isAnalyzed) Seq(plan) else Nil:
Screen Shot 2021-04-23 at 9 30 57 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about the "optimized plan" in the EXPLAIN result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following for the both cases. Were you expecting something different?

== Parsed Logical Plan ==
'CreateViewStatement [test], SELECT 1, false, false, PersistedView
+- 'Project [unresolvedalias(1, None)]
   +- OneRowRelation

== Analyzed Logical Plan ==

CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Optimized Logical Plan ==
CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Physical Plan ==
Execute CreateViewCommand
   +- CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
         +- Project [1 AS 1#7]
            +- OneRowRelation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. I thought there will be problems if a plan is in both children and innerChildren, but seems we are fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reason is that when explain runs, CreateViewCommand already went thru the Analyzer and its children is removed (note the true above, which is isAnalyzed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the "Parsed Logical Plan" should be unresolved and the plan is in both children and innerChildren.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Parsed Logical Plan" is CreateViewStatement, not CreateViewCommand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah now I get it. If there is a place that creates CreateViewCommand directly, we get a problem.

I think it's safer to avoid that to be future-proof. e.g.

override def innerChildren: Seq[QueryPlan[_]] = if (analyzed) Seq(plan) else Nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will do a follow up PR.


// `plan` needs to be analyzed, but shouldn't be optimized. Thus, remove `plan` from
// children once the analysis phase is finished.
override def children: Seq[LogicalPlan] = if (!isPlanAnalyzed) plan :: Nil else Nil

override def transform: LogicalPlan = copy(isPlanAnalyzed = true)

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand All @@ -96,10 +103,8 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
assert(isPlanAnalyzed)
val analyzedPlan = plan

if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ case class CacheTableAsSelectExec(
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
plan = query,
allowExisting = false,
replace = false,
viewType = LocalTempView
viewType = LocalTempView,
isPlanAnalyzed = true
)
)
dataFrameForCachedPlan.logicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down