Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children to be removed once the command is marked as analyzed. #32032

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
CleanupAliases),
Batch("HandleAnalysisOnlyCommand", Once,
HandleAnalysisOnlyCommand)
)

/**
Expand Down Expand Up @@ -3501,6 +3503,18 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

/**
* A rule that marks a command as analyzed so that its children are removed to avoid
* being optimized. This rule should run after all other analysis rules are run.
*/
object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c: AnalysisOnlyCommand if c.resolved =>
checkAnalysis(c)
c.markAsAnalyzed()
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,14 @@ trait Command extends LogicalPlan {
trait LeafCommand extends Command with LeafLike[LogicalPlan]
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
trait BinaryCommand extends Command with BinaryLike[LogicalPlan]

/**
* A logical node that can be used for a command that requires its children to be only analyzed,
* but not optimized.
*/
trait AnalysisOnlyCommand extends Command {
val isAnalyzed: Boolean
def childrenToAnalyze: Seq[LogicalPlan]
override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
def markAsAnalyzed(): LogicalPlan
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,16 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String],
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect =
copy(plan = newChildren.head)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that newChildren is empty? Probably safer to add if (isAnalyzed) ... else ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called only when there exist children. Maybe an assert is better assert(!isAnalyzed)? WDYT?

if (childrenIndexedSeq.isEmpty ||
childrenFastEquals(newChildrenIndexedSeq, childrenIndexedSeq)) {
this
} else {
CurrentOrigin.withOrigin(origin) {
val res = withNewChildrenInternal(newChildrenIndexedSeq)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert SGTM


override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the UNCACHE TABLE command.
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
plan = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
viewType = viewType,
isAnalyzed = true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since logicalPlan is already analyzed, I am setting this to true here.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)

case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
query)

case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) if child.resolved =>
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
originalText, child, allowExisting, replace, viewType) =>

val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
Expand All @@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
comment,
properties,
originalText,
child,
allowExisting,
replace,
viewType)
name = v1TableName.asTableIdentifier,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
originalText = originalText,
plan = child,
allowExisting = allowExisting,
replace = replace,
viewType = viewType)

case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand All @@ -48,29 +48,39 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param plan the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
* @param viewType the expected view type to be created with this command.
* @param isAnalyzed whether this command is analyzed or not.
*/
case class CreateViewCommand(
name: TableIdentifier,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType)
extends LeafRunnableCommand {
viewType: ViewType,
isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand =
copy(plan = newChildren.head)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only include it as inner children only when isAnalyzed = true? Maybe we can put it in the base class, as it's better to include the plan in inner children to show it in the UI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan which UI are you referring to? If you are referring to the Spark UI, I see the following and it shows the same even if I use override def innerChildren: Seq[QueryPlan[_]] = if (isAnalyzed) Seq(plan) else Nil:
Screen Shot 2021-04-23 at 9 30 57 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about the "optimized plan" in the EXPLAIN result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following for the both cases. Were you expecting something different?

== Parsed Logical Plan ==
'CreateViewStatement [test], SELECT 1, false, false, PersistedView
+- 'Project [unresolvedalias(1, None)]
   +- OneRowRelation

== Analyzed Logical Plan ==

CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Optimized Logical Plan ==
CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
   +- Project [1 AS 1#7]
      +- OneRowRelation

== Physical Plan ==
Execute CreateViewCommand
   +- CreateViewCommand `default`.`test`, SELECT 1, false, false, PersistedView, true
         +- Project [1 AS 1#7]
            +- OneRowRelation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. I thought there will be problems if a plan is in both children and innerChildren, but seems we are fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the reason is that when explain runs, CreateViewCommand already went thru the Analyzer and its children is removed (note the true above, which is isAnalyzed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the "Parsed Logical Plan" should be unresolved and the plan is in both children and innerChildren.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Parsed Logical Plan" is CreateViewStatement, not CreateViewCommand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah now I get it. If there is a place that creates CreateViewCommand directly, we get a problem.

I think it's safer to avoid that to be future-proof. e.g.

override def innerChildren: Seq[QueryPlan[_]] = if (analyzed) Seq(plan) else Nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will do a follow up PR.


// `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil

def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand All @@ -96,10 +106,10 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (!isAnalyzed) {
throw new AnalysisException("The logical plan that represents the view is not analyzed.")
}
val analyzedPlan = plan

if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
Expand Down Expand Up @@ -233,12 +243,21 @@ case class CreateViewCommand(
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends LeafRunnableCommand {
query: LogicalPlan,
isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {

import ViewHelper._

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand =
copy(query = newChildren.head)

override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil

def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)

override def run(session: SparkSession): Seq[Row] = {
if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ case class CacheTableAsSelectExec(
override lazy val relationName: String = tempViewName

override lazy val planToCache: LogicalPlan = {
Dataset.ofRows(sparkSession,
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
allowExisting = false,
replace = false,
viewType = LocalTempView
)
)
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
plan = query,
allowExisting = false,
replace = false,
viewType = LocalTempView,
isAnalyzed = true
).run(sparkSession)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just call run now that the plan is analyzed; no need to go thru Dataset.ofRows.


dataFrameForCachedPlan.logicalPlan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down