Skip to content

Commit

Permalink
[SPARK-34152][SQL] Make CreateViewStatement.child to be LogicalPlan's…
Browse files Browse the repository at this point in the history
… children so that it's resolved in analyze phase

### What changes were proposed in this pull request?

This PR proposes to make `CreateViewStatement.child` to be `LogicalPlan`'s `children` so that it's resolved in the analyze phase.

### Why are the changes needed?

Currently, the `CreateViewStatement.child` is resolved when the create view command runs, which is inconsistent with other plan resolutions. For example, you may see the following in the physical plan:
```
== Physical Plan ==
Execute CreateViewCommand (1)
   +- CreateViewCommand (2)
         +- Project (4)
            +- UnresolvedRelation (3)
```

### Does this PR introduce _any_ user-facing change?

Yes. For the example, you will now see the resolved plan:
```
== Physical Plan ==
Execute CreateViewCommand (1)
   +- CreateViewCommand (2)
         +- Project (5)
            +- SubqueryAlias (4)
               +- LogicalRelation (3)
```

### How was this patch tested?

Updated existing tests.

Closes #31273 from imback82/spark-34152.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Feb 24, 2021
1 parent 5d9cfd7 commit 714ff73
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,20 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
private def referredTempViewNames: Seq[Seq[String]] = AnalysisContext.get.referredTempViewNames
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
AnalysisContext.get.referredTempViewNames.exists { n =>
(n.length == nameParts.length) && n.zip(nameParts).forall {
case (a, b) => resolver(a, b)
}
}
}

private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case other => other
}
}

/**
* Resolve relations to temp views. This is not an actual rule, and is called by
Expand All @@ -893,7 +906,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
lookupTempView(ident, performCheck = true).map(unwrapRelationPlan).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
Expand Down Expand Up @@ -930,7 +943,7 @@ class Analyzer(override val catalogManager: CatalogManager)
isStreaming: Boolean = false,
performCheck: Boolean = false): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView && !referredTempViewNames.contains(identifier)) return None
if (isResolvingView && !isReferredTempViewName(identifier)) return None

val tmpView = identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
Expand All @@ -948,7 +961,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// If we are resolving relations insides views, we need to expand single-part relation names with
// the current catalog and namespace of when the view was created.
private def expandRelationName(nameParts: Seq[String]): Seq[String] = {
if (!isResolvingView || referredTempViewNames.contains(nameParts)) return nameParts
if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts

if (nameParts.length == 1) {
AnalysisContext.get.catalogAndNamespace :+ nameParts.head
Expand Down Expand Up @@ -1145,7 +1158,10 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => other
}

EliminateSubqueryAliases(relation) match {
// Inserting into a file-based temporary view is allowed.
// (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
// Thus, we need to look at the raw plan if `relation` is a temporary view.
unwrapRelationPlan(relation) match {
case v: View =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
case other => i.copy(table = other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ class SessionCatalog(
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawTempView`
* Generate a [[View]] operator from the temporary view stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
getRawTempView(name).map(getTempViewPlan)
Expand All @@ -641,8 +640,7 @@ class SessionCatalog(
}

/**
* Generate a [[View]] operator from the view description if the view stores sql text,
* otherwise, it is same to `getRawGlobalTempView`
* Generate a [[View]] operator from the global temporary view stored.
*/
def getGlobalTempView(name: String): Option[LogicalPlan] = {
getRawGlobalTempView(name).map(getTempViewPlan)
Expand Down Expand Up @@ -683,7 +681,7 @@ class SessionCatalog(
val table = formatTableName(name.table)
if (name.database.isEmpty) {
tempViews.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
CatalogTable(
identifier = TableIdentifier(table),
Expand All @@ -693,7 +691,7 @@ class SessionCatalog(
}.getOrElse(getTableMetadata(name))
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).map {
case TemporaryViewRelation(metadata) => metadata
case TemporaryViewRelation(metadata, _) => metadata
case plan =>
CatalogTable(
identifier = TableIdentifier(table, Some(globalTempViewManager.database)),
Expand Down Expand Up @@ -838,9 +836,11 @@ class SessionCatalog(

private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case viewInfo: TemporaryViewRelation =>
fromCatalogTable(viewInfo.tableMeta, isTempView = true)
case v => v
case TemporaryViewRelation(tableMeta, None) =>
fromCatalogTable(tableMeta, isTempView = true)
case TemporaryViewRelation(tableMeta, Some(plan)) =>
View(desc = tableMeta, isTempView = true, child = plan)
case other => other
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
Expand Down Expand Up @@ -467,6 +468,8 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"

def splitLargeTableProp(
key: String,
value: String,
Expand Down Expand Up @@ -779,9 +782,15 @@ case class UnresolvedCatalogRelation(

/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(tableMeta: CatalogTable) extends LeafNode {
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -443,21 +444,25 @@ case class InsertIntoDir(
}

/**
* A container for holding the view description(CatalogTable), and the output of the view. The
* child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
* if the `viewText` is not defined.
* A container for holding the view description(CatalogTable) and info whether the view is temporary
* or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the
* `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the
* view description should contain a `VIEW_CREATED_FROM_DATAFRAME` property; in this case, the child
* must be already resolved.
*
* This operator will be removed at the end of analysis stage.
*
* @param desc A view description(CatalogTable) that provides necessary information to resolve the
* view.
* we are able to decouple the output from the underlying structure.
* @param child The logical plan of a view operator, it should be a logical plan parsed from the
* `CatalogTable.viewText`, should throw an error if the `viewText` is not defined.
* @param isTempView A flag to indicate whether the view is temporary or not.
* @param child The logical plan of a view operator. If the view description is available, it should
* be a logical plan parsed from the `CatalogTable.viewText`.
*/
case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)

override def output: Seq[Attribute] = child.output

Expand All @@ -470,6 +475,9 @@ case class View(
case _ => child.canonicalized
}

def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)

// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
// creation. We should remove this extra Project during canonicalize if it does nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ case class CreateViewStatement(
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType) extends ParsedStatement
viewType: ViewType) extends ParsedStatement {

override def children: Seq[LogicalPlan] = Seq(child)
}

/**
* A REPLACE TABLE command, as parsed from SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Project(Seq(UnresolvedAttribute("a")), testRelation),
Project(testRelation.output, testRelation))

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("TbL.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation))
Expand All @@ -105,13 +105,13 @@ class AnalysisSuite extends AnalysisTest with Matchers {
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Seq("cannot resolve"))

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("TbL.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation),
caseSensitive = false)

checkAnalysis(
checkAnalysisWithoutViewWrapper(
Project(Seq(UnresolvedAttribute("tBl.a")),
SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
Project(testRelation.output, testRelation),
Expand Down Expand Up @@ -203,10 +203,10 @@ class AnalysisSuite extends AnalysisTest with Matchers {

test("resolve relations") {
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe")), Seq())
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation)
checkAnalysis(
checkAnalysisWithoutViewWrapper(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation)
checkAnalysisWithoutViewWrapper(
UnresolvedRelation(TableIdentifier("tAbLe")), testRelation, caseSensitive = false)
checkAnalysis(
checkAnalysisWithoutViewWrapper(
UnresolvedRelation(TableIdentifier("TaBlE")), testRelation, caseSensitive = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ trait AnalysisTest extends PlanTest {
}
}

protected def checkAnalysisWithoutViewWrapper(
inputPlan: LogicalPlan,
expectedPlan: LogicalPlan,
caseSensitive: Boolean = true): Unit = {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
}
comparePlans(transformed, expectedPlan)
}
}

protected override def comparePlans(
plan1: LogicalPlan,
plan2: LogicalPlan,
Expand Down
Loading

0 comments on commit 714ff73

Please sign in to comment.