From 714ff73d4aec317fddf32720d5a7a1c283921983 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 24 Feb 2021 06:50:11 +0000 Subject: [PATCH] [SPARK-34152][SQL] Make CreateViewStatement.child to be LogicalPlan's children so that it's resolved in analyze phase ### What changes were proposed in this pull request? This PR proposes to make `CreateViewStatement.child` to be `LogicalPlan`'s `children` so that it's resolved in the analyze phase. ### Why are the changes needed? Currently, the `CreateViewStatement.child` is resolved when the create view command runs, which is inconsistent with other plan resolutions. For example, you may see the following in the physical plan: ``` == Physical Plan == Execute CreateViewCommand (1) +- CreateViewCommand (2) +- Project (4) +- UnresolvedRelation (3) ``` ### Does this PR introduce _any_ user-facing change? Yes. For the example, you will now see the resolved plan: ``` == Physical Plan == Execute CreateViewCommand (1) +- CreateViewCommand (2) +- Project (5) +- SubqueryAlias (4) +- LogicalRelation (3) ``` ### How was this patch tested? Updated existing tests. Closes #31273 from imback82/spark-34152. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 26 ++++-- .../UnsupportedOperationChecker.scala | 1 + .../sql/catalyst/catalog/SessionCatalog.scala | 18 ++-- .../sql/catalyst/catalog/interface.scala | 13 ++- .../plans/logical/basicLogicalOperators.scala | 20 +++-- .../catalyst/plans/logical/statements.scala | 5 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 12 +-- .../sql/catalyst/analysis/AnalysisTest.scala | 13 +++ .../catalyst/analysis/ResolveHintsSuite.scala | 56 ++++++------ .../catalog/SessionCatalogSuite.scala | 35 +++++--- .../analysis/ResolveSessionCatalog.scala | 2 +- .../spark/sql/execution/command/views.scala | 90 +++++++++++-------- .../sql-tests/results/explain-aqe.sql.out | 30 +++++-- .../sql-tests/results/explain.sql.out | 30 +++++-- .../sql/connector/DataSourceV2SQLSuite.scala | 2 +- 15 files changed, 230 insertions(+), 123 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b351d76411ff2..3952cc063b73c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -867,7 +867,20 @@ class Analyzer(override val catalogManager: CatalogManager) } private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty - private def referredTempViewNames: Seq[Seq[String]] = AnalysisContext.get.referredTempViewNames + private def isReferredTempViewName(nameParts: Seq[String]): Boolean = { + AnalysisContext.get.referredTempViewNames.exists { n => + (n.length == nameParts.length) && n.zip(nameParts).forall { + case (a, b) => resolver(a, b) + } + } + } + + private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = { + EliminateSubqueryAliases(plan) match { + case v: View if v.isDataFrameTempView => v.child + case other => other + } + } /** * Resolve relations to temp views. This is not an actual rule, and is called by @@ -893,7 +906,7 @@ class Analyzer(override val catalogManager: CatalogManager) case write: V2WriteCommand => write.table match { case UnresolvedRelation(ident, _, false) => - lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map { + lookupTempView(ident, performCheck = true).map(unwrapRelationPlan).map { case r: DataSourceV2Relation => write.withNewTable(r) case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted) }.getOrElse(write) @@ -930,7 +943,7 @@ class Analyzer(override val catalogManager: CatalogManager) isStreaming: Boolean = false, performCheck: Boolean = false): Option[LogicalPlan] = { // Permanent View can't refer to temp views, no need to lookup at all. - if (isResolvingView && !referredTempViewNames.contains(identifier)) return None + if (isResolvingView && !isReferredTempViewName(identifier)) return None val tmpView = identifier match { case Seq(part1) => v1SessionCatalog.lookupTempView(part1) @@ -948,7 +961,7 @@ class Analyzer(override val catalogManager: CatalogManager) // If we are resolving relations insides views, we need to expand single-part relation names with // the current catalog and namespace of when the view was created. private def expandRelationName(nameParts: Seq[String]): Seq[String] = { - if (!isResolvingView || referredTempViewNames.contains(nameParts)) return nameParts + if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts if (nameParts.length == 1) { AnalysisContext.get.catalogAndNamespace :+ nameParts.head @@ -1145,7 +1158,10 @@ class Analyzer(override val catalogManager: CatalogManager) case other => other } - EliminateSubqueryAliases(relation) match { + // Inserting into a file-based temporary view is allowed. + // (e.g., spark.read.parquet("path").createOrReplaceTempView("t"). + // Thus, we need to look at the raw plan if `relation` is a temporary view. + unwrapRelationPlan(relation) match { case v: View => throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) case other => i.copy(table = other) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index ab7d90098bfd3..42ccc45cec62f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -393,6 +393,7 @@ object UnsupportedOperationChecker extends Logging { case (_: Project | _: Filter | _: MapElements | _: MapPartitions | _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => + case v: View if v.isDataFrameTempView => case node if node.nodeName == "StreamingRelationV2" => case node => throwError(s"Continuous processing does not support ${node.nodeName} operations.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 689e7c3733180..74a80f566f94a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -622,8 +622,7 @@ class SessionCatalog( } /** - * Generate a [[View]] operator from the view description if the view stores sql text, - * otherwise, it is same to `getRawTempView` + * Generate a [[View]] operator from the temporary view stored. */ def getTempView(name: String): Option[LogicalPlan] = synchronized { getRawTempView(name).map(getTempViewPlan) @@ -641,8 +640,7 @@ class SessionCatalog( } /** - * Generate a [[View]] operator from the view description if the view stores sql text, - * otherwise, it is same to `getRawGlobalTempView` + * Generate a [[View]] operator from the global temporary view stored. */ def getGlobalTempView(name: String): Option[LogicalPlan] = { getRawGlobalTempView(name).map(getTempViewPlan) @@ -683,7 +681,7 @@ class SessionCatalog( val table = formatTableName(name.table) if (name.database.isEmpty) { tempViews.get(table).map { - case TemporaryViewRelation(metadata) => metadata + case TemporaryViewRelation(metadata, _) => metadata case plan => CatalogTable( identifier = TableIdentifier(table), @@ -693,7 +691,7 @@ class SessionCatalog( }.getOrElse(getTableMetadata(name)) } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { globalTempViewManager.get(table).map { - case TemporaryViewRelation(metadata) => metadata + case TemporaryViewRelation(metadata, _) => metadata case plan => CatalogTable( identifier = TableIdentifier(table, Some(globalTempViewManager.database)), @@ -838,9 +836,11 @@ class SessionCatalog( private def getTempViewPlan(plan: LogicalPlan): LogicalPlan = { plan match { - case viewInfo: TemporaryViewRelation => - fromCatalogTable(viewInfo.tableMeta, isTempView = true) - case v => v + case TemporaryViewRelation(tableMeta, None) => + fromCatalogTable(tableMeta, isTempView = true) + case TemporaryViewRelation(tableMeta, Some(plan)) => + View(desc = tableMeta, isTempView = true, child = plan) + case other => other } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 89cb103a7bf73..b6a23214c9084 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils @@ -467,6 +468,8 @@ object CatalogTable { val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames" val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames" + val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame" + def splitLargeTableProp( key: String, value: String, @@ -779,9 +782,15 @@ case class UnresolvedCatalogRelation( /** * A wrapper to store the temporary view info, will be kept in `SessionCatalog` - * and will be transformed to `View` during analysis + * and will be transformed to `View` during analysis. If the temporary view was + * created from a dataframe, `plan` is set to the analyzed plan for the view. */ -case class TemporaryViewRelation(tableMeta: CatalogTable) extends LeafNode { +case class TemporaryViewRelation( + tableMeta: CatalogTable, + plan: Option[LogicalPlan] = None) extends LeafNode { + require(plan.isEmpty || + (plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME))) + override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index fad1457ac1403..3f20d8f67b44d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.AliasIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -443,21 +444,25 @@ case class InsertIntoDir( } /** - * A container for holding the view description(CatalogTable), and the output of the view. The - * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error - * if the `viewText` is not defined. + * A container for holding the view description(CatalogTable) and info whether the view is temporary + * or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the + * `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the + * view description should contain a `VIEW_CREATED_FROM_DATAFRAME` property; in this case, the child + * must be already resolved. + * * This operator will be removed at the end of analysis stage. * * @param desc A view description(CatalogTable) that provides necessary information to resolve the * view. - * we are able to decouple the output from the underlying structure. - * @param child The logical plan of a view operator, it should be a logical plan parsed from the - * `CatalogTable.viewText`, should throw an error if the `viewText` is not defined. + * @param isTempView A flag to indicate whether the view is temporary or not. + * @param child The logical plan of a view operator. If the view description is available, it should + * be a logical plan parsed from the `CatalogTable.viewText`. */ case class View( desc: CatalogTable, isTempView: Boolean, child: LogicalPlan) extends UnaryNode { + require(!isDataFrameTempView || child.resolved) override def output: Seq[Attribute] = child.output @@ -470,6 +475,9 @@ case class View( case _ => child.canonicalized } + def isDataFrameTempView: Boolean = + isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME) + // When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view // output schema doesn't change even if the table referenced by the view is changed after view // creation. We should remove this extra Project during canonicalize if it does nothing. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 26f36bfe9b970..cc6e387d0f600 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -179,7 +179,10 @@ case class CreateViewStatement( child: LogicalPlan, allowExisting: Boolean, replace: Boolean, - viewType: ViewType) extends ParsedStatement + viewType: ViewType) extends ParsedStatement { + + override def children: Seq[LogicalPlan] = Seq(child) +} /** * A REPLACE TABLE command, as parsed from SQL. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 0e0142eb76894..a3c26ecdaba2a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -95,7 +95,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { Project(Seq(UnresolvedAttribute("a")), testRelation), Project(testRelation.output, testRelation)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( Project(Seq(UnresolvedAttribute("TbL.a")), SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))), Project(testRelation.output, testRelation)) @@ -105,13 +105,13 @@ class AnalysisSuite extends AnalysisTest with Matchers { SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))), Seq("cannot resolve")) - checkAnalysis( + checkAnalysisWithoutViewWrapper( Project(Seq(UnresolvedAttribute("TbL.a")), SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))), Project(testRelation.output, testRelation), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( Project(Seq(UnresolvedAttribute("tBl.a")), SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))), Project(testRelation.output, testRelation), @@ -203,10 +203,10 @@ class AnalysisSuite extends AnalysisTest with Matchers { test("resolve relations") { assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe")), Seq()) - checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation) - checkAnalysis( + checkAnalysisWithoutViewWrapper(UnresolvedRelation(TableIdentifier("TaBlE")), testRelation) + checkAnalysisWithoutViewWrapper( UnresolvedRelation(TableIdentifier("tAbLe")), testRelation, caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedRelation(TableIdentifier("TaBlE")), testRelation, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 37db4be502a83..7248424a68ad9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -59,6 +59,19 @@ trait AnalysisTest extends PlanTest { } } + protected def checkAnalysisWithoutViewWrapper( + inputPlan: LogicalPlan, + expectedPlan: LogicalPlan, + caseSensitive: Boolean = true): Unit = { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker) + val transformed = actualPlan transformUp { + case v: View if v.isDataFrameTempView => v.child + } + comparePlans(transformed, expectedPlan) + } + } + protected override def comparePlans( plan1: LogicalPlan, plan2: LogicalPlan, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 513f1d001f757..9db64c684c40f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -31,36 +31,36 @@ class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ test("invalid hints should be ignored") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), testRelation, caseSensitive = false) } test("case-sensitive or insensitive parameters") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = true) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), testRelation, caseSensitive = true) } test("multiple broadcast hint aliases") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), Join(ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), ResolvedHint(testRelation2, HintInfo(strategy = Some(BROADCAST))), @@ -69,7 +69,7 @@ class ResolveHintsSuite extends AnalysisTest { } test("do not traverse past existing broadcast hints") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table"), ResolvedHint(table("table").where('a > 1), HintInfo(strategy = Some(BROADCAST)))), ResolvedHint(testRelation.where('a > 1), HintInfo(strategy = Some(BROADCAST))).analyze, @@ -77,32 +77,32 @@ class ResolveHintsSuite extends AnalysisTest { } test("should work for subqueries") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), ResolvedHint(testRelation, HintInfo(strategy = Some(BROADCAST))), caseSensitive = false) // Negative case: if the alias doesn't match, don't match the original table name. - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table"), table("table").as("tableAlias")), testRelation, caseSensitive = false) } test("do not traverse past subquery alias") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)), testRelation.where('a > 1).analyze, caseSensitive = false) } test("should work for CTE") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( CatalystSqlParser.parsePlan( """ |WITH ctetable AS (SELECT * FROM table WHERE a > 1) @@ -115,7 +115,7 @@ class ResolveHintsSuite extends AnalysisTest { } test("should not traverse down CTE") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( CatalystSqlParser.parsePlan( """ |WITH ctetable AS (SELECT * FROM table WHERE a > 1) @@ -127,16 +127,16 @@ class ResolveHintsSuite extends AnalysisTest { } test("coalesce and repartition hint") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("COALESCE", Seq(Literal(10)), table("TaBlE")), Repartition(numPartitions = 10, shuffle = false, child = testRelation)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("coalesce", Seq(Literal(20)), table("TaBlE")), Repartition(numPartitions = 20, shuffle = false, child = testRelation)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("REPARTITION", Seq(Literal(100)), table("TaBlE")), Repartition(numPartitions = 100, shuffle = true, child = testRelation)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("RePARTITion", Seq(Literal(200)), table("TaBlE")), Repartition(numPartitions = 200, shuffle = true, child = testRelation)) @@ -152,15 +152,15 @@ class ResolveHintsSuite extends AnalysisTest { UnresolvedHint("COALESCE", Seq(Literal(1.0)), table("TaBlE")), Seq(errMsg)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("RePartition", Seq(Literal(10), UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression(Seq(AttributeReference("a", IntegerType)()), testRelation, 10)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("REPARTITION", Seq(Literal(10), UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression(Seq(AttributeReference("a", IntegerType)()), testRelation, 10)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression( Seq(AttributeReference("a", IntegerType)()), testRelation, None)) @@ -176,13 +176,13 @@ class ResolveHintsSuite extends AnalysisTest { } e.getMessage.contains("For range partitioning use REPARTITION_BY_RANGE instead") - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint( "REPARTITION_BY_RANGE", Seq(Literal(10), UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression( Seq(SortOrder(AttributeReference("a", IntegerType)(), Ascending)), testRelation, 10)) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint( "REPARTITION_BY_RANGE", Seq(UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression( @@ -225,7 +225,7 @@ class ResolveHintsSuite extends AnalysisTest { test("log warnings for invalid hints") { val logAppender = new LogAppender("invalid hints") withLogAppender(logAppender) { - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint("unknown_hint", Seq("TaBlE"), table("TaBlE")), testRelation, caseSensitive = false) @@ -236,7 +236,7 @@ class ResolveHintsSuite extends AnalysisTest { } test("SPARK-30003: Do not throw stack overflow exception in non-root unknown hint resolution") { - checkAnalysis( + checkAnalysisWithoutViewWrapper( Project(testRelation.output, UnresolvedHint("unknown_hint", Seq("TaBlE"), table("TaBlE"))), Project(testRelation.output, testRelation), caseSensitive = false) @@ -248,7 +248,7 @@ class ResolveHintsSuite extends AnalysisTest { ("SHUFFLE_HASH", SHUFFLE_HASH), ("SHUFFLE_REPLICATE_NL", SHUFFLE_REPLICATE_NL)).foreach { case (hintName, st) => // local temp table (single-part identifier case) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint(hintName, Seq("table", "table2"), table("TaBlE").join(table("TaBlE2"))), Join( @@ -259,7 +259,7 @@ class ResolveHintsSuite extends AnalysisTest { JoinHint.NONE), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint(hintName, Seq("TaBlE", "table2"), table("TaBlE").join(table("TaBlE2"))), Join( @@ -271,7 +271,7 @@ class ResolveHintsSuite extends AnalysisTest { caseSensitive = true) // global temp table (multi-part identifier case) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint(hintName, Seq("GlOBal_TeMP.table4", "table5"), table("global_temp", "table4").join(table("global_temp", "table5"))), Join( @@ -282,7 +282,7 @@ class ResolveHintsSuite extends AnalysisTest { JoinHint.NONE), caseSensitive = false) - checkAnalysis( + checkAnalysisWithoutViewWrapper( UnresolvedHint(hintName, Seq("global_temp.TaBlE4", "table5"), table("global_temp", "TaBlE4").join(table("global_temp", "TaBlE5"))), Join( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 57b728aa5eb95..635fea9114434 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration._ import org.scalatest.concurrent.Eventually import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -84,6 +84,12 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { catalog.reset() } } + + private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match { + case Some(v: View) if v.isDataFrameTempView => Some(v.child) + case other => other + } + // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -301,16 +307,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) - assert(catalog.getTempView("tbl1") == Option(tempTable1)) - assert(catalog.getTempView("tbl2") == Option(tempTable2)) - assert(catalog.getTempView("tbl3").isEmpty) + assert(getTempViewRawPlan(catalog.getTempView("tbl1")) == Option(tempTable1)) + assert(getTempViewRawPlan(catalog.getTempView("tbl2")) == Option(tempTable2)) + assert(getTempViewRawPlan(catalog.getTempView("tbl3")).isEmpty) // Temporary view already exists intercept[TempTableAlreadyExistsException] { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) } // Temporary view already exists but we override it catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) - assert(catalog.getTempView("tbl1") == Option(tempTable2)) + assert(getTempViewRawPlan(catalog.getTempView("tbl1")) == Option(tempTable2)) } } @@ -352,7 +358,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.setCurrentDatabase("db2") - assert(catalog.getTempView("tbl1") == Some(tempTable)) + assert(getTempViewRawPlan(catalog.getTempView("tbl1")) == Some(tempTable)) assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be dropped first catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) @@ -366,7 +372,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) catalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, purge = false) - assert(catalog.getTempView("tbl1") == Some(tempTable)) + assert(getTempViewRawPlan(catalog.getTempView("tbl1")) == Some(tempTable)) assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2")) } } @@ -419,16 +425,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.setCurrentDatabase("db2") - assert(catalog.getTempView("tbl1") == Option(tempTable)) + assert(getTempViewRawPlan(catalog.getTempView("tbl1")) == Option(tempTable)) assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be renamed first catalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) assert(catalog.getTempView("tbl1").isEmpty) - assert(catalog.getTempView("tbl3") == Option(tempTable)) + assert(getTempViewRawPlan(catalog.getTempView("tbl3")) == Option(tempTable)) assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is specified, temp tables are never renamed catalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4")) - assert(catalog.getTempView("tbl3") == Option(tempTable)) + assert(getTempViewRawPlan(catalog.getTempView("tbl3")) == Option(tempTable)) assert(catalog.getTempView("tbl4").isEmpty) assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) } @@ -625,8 +631,9 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name - assert(catalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1)) + val tbl1 = catalog.lookupRelation(TableIdentifier("tbl1")).asInstanceOf[SubqueryAlias] + assert(tbl1.identifier == AliasIdentifier("tbl1")) + assert(getTempViewRawPlan(Some(tbl1.child)).get == tempTable1) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head @@ -1581,11 +1588,11 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { original.copyStateTo(clone) assert(original ne clone) - assert(clone.getTempView("copytest1") == Some(tempTable1)) + assert(getTempViewRawPlan(clone.getTempView("copytest1")) == Some(tempTable1)) // check if clone and original independent clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) - assert(original.getTempView("copytest1") == Some(tempTable1)) + assert(getTempViewRawPlan(original.getTempView("copytest1")) == Some(tempTable1)) val tempTable2 = Range(1, 20, 2, 10) original.createTempView("copytest2", tempTable2, overrideIfExists = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7ddd2ab6d913c..232e8a16cdd76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -471,7 +471,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case CreateViewStatement( tbl, userSpecifiedColumns, comment, properties, - originalText, child, allowExisting, replace, viewType) => + originalText, child, allowExisting, replace, viewType) if child.resolved => val v1TableName = if (viewType != PersistedView) { // temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index a14f247515773..bae7c54f0b99d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -23,12 +23,12 @@ import org.json4s.JsonAST.{JArray, JString} import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunction, UnresolvedRelation, ViewType} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View, With} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -111,12 +111,11 @@ case class CreateViewCommand( // When creating a permanent view, not allowed to reference temporary objects. // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) - verifyTemporaryObjectsNotExists(catalog, isTemporary, name, child) + verifyTemporaryObjectsNotExists(catalog, isTemporary, name, analyzedPlan) if (viewType == LocalTempView) { val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) - if (replace && catalog.getRawTempView(name.table).isDefined && - !catalog.getRawTempView(name.table).get.sameResult(aliasedPlan)) { + if (replace && !isSamePlan(catalog.getRawTempView(name.table), aliasedPlan)) { logInfo(s"Try to uncache ${name.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(name), name) CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) @@ -129,18 +128,18 @@ case class CreateViewCommand( sparkSession, analyzedPlan, aliasedPlan.schema, - originalText, - child)) + originalText)) } else { - aliasedPlan + TemporaryViewRelation( + prepareTemporaryViewFromDataFrame(name, aliasedPlan), + Some(aliasedPlan)) } catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) - if (replace && catalog.getRawGlobalTempView(name.table).isDefined && - !catalog.getRawGlobalTempView(name.table).get.sameResult(aliasedPlan)) { + if (replace && !isSamePlan(catalog.getRawGlobalTempView(name.table), aliasedPlan)) { logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) @@ -152,10 +151,11 @@ case class CreateViewCommand( sparkSession, analyzedPlan, aliasedPlan.schema, - originalText, - child)) + originalText)) } else { - aliasedPlan + TemporaryViewRelation( + prepareTemporaryViewFromDataFrame(name, aliasedPlan), + Some(aliasedPlan)) } catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (catalog.tableExists(name)) { @@ -192,6 +192,18 @@ case class CreateViewCommand( Seq.empty[Row] } + /** + * Checks if the temp view (the result of getTempViewRawPlan or getRawGlobalTempView) is storing + * the same plan as the given aliased plan. + */ + private def isSamePlan( + rawTempView: Option[LogicalPlan], + aliasedPlan: LogicalPlan): Boolean = rawTempView match { + case Some(TemporaryViewRelation(_, Some(p))) => p.sameResult(aliasedPlan) + case Some(p) => p.sameResult(aliasedPlan) + case _ => false + } + /** * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, * else return the analyzed plan directly. @@ -280,7 +292,7 @@ case class AlterViewAsCommand( checkCyclicViewReference(analyzedPlan, Seq(name), name) TemporaryViewRelation( prepareTemporaryView( - name, session, analyzedPlan, analyzedPlan.schema, Some(originalText), query)) + name, session, analyzedPlan, analyzedPlan.schema, Some(originalText))) } session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition) } @@ -541,40 +553,32 @@ object ViewHelper { } /** - * Collect all temporary views and functions and return the identifiers separately - * This func traverses the unresolved plan `child`. Below are the reasons: - * 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding - * logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is - * added/generated from a temporary view. - * 2) The temp functions are represented by multiple classes. Most are inaccessible from this - * package (e.g., HiveGenericUDF). + * Collect all temporary views and functions and return the identifiers separately. */ private def collectTemporaryObjects( catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]], Seq[String]) = { def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { child.flatMap { - case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) => - Seq(nameParts) - case w: With if !w.resolved => w.innerChildren.flatMap(collectTempViews) - case plan if !plan.resolved => plan.expressions.flatMap(_.flatMap { + case view: View if view.isTempView => + val ident = view.desc.identifier + Seq(ident.database.toSeq :+ ident.table) + case plan => plan.expressions.flatMap(_.flatMap { case e: SubqueryExpression => collectTempViews(e.plan) case _ => Seq.empty }) - case _ => Seq.empty }.distinct } def collectTempFunctions(child: LogicalPlan): Seq[String] = { child.flatMap { - case w: With if !w.resolved => w.innerChildren.flatMap(collectTempFunctions) - case plan if !plan.resolved => + case plan => plan.expressions.flatMap(_.flatMap { case e: SubqueryExpression => collectTempFunctions(e.plan) - case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) => - Seq(e.name.funcName) + case e: UserDefinedExpression + if catalog.isTemporaryFunction(FunctionIdentifier(e.name)) => + Seq(e.name) case _ => Seq.empty }) - case _ => Seq.empty }.distinct } (collectTempViews(child), collectTempFunctions(child)) @@ -592,11 +596,10 @@ object ViewHelper { session: SparkSession, analyzedPlan: LogicalPlan, viewSchema: StructType, - originalText: Option[String], - child: LogicalPlan): CatalogTable = { + originalText: Option[String]): CatalogTable = { val catalog = session.sessionState.catalog - val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, child) + val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan) // TBLPROPERTIES is not allowed for temporary view, so we don't use it for // generating temporary view properties val newProperties = generateViewProperties( @@ -610,4 +613,19 @@ object ViewHelper { viewText = originalText, properties = newProperties) } + + /** + * Returns a [[CatalogTable]] that contains information for the temporary view created + * from a dataframe. + */ + def prepareTemporaryViewFromDataFrame( + viewName: TableIdentifier, + analyzedPlan: LogicalPlan): CatalogTable = { + CatalogTable( + identifier = viewName, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = analyzedPlan.schema, + properties = Map((VIEW_CREATED_FROM_DATAFRAME, "true"))) + } } diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 26c85a0241e57..bcb98396b3028 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -904,8 +904,9 @@ struct == Physical Plan == Execute CreateViewCommand (1) +- CreateViewCommand (2) - +- Project (4) - +- UnresolvedRelation (3) + +- Project (5) + +- SubqueryAlias (4) + +- LogicalRelation (3) (1) Execute CreateViewCommand @@ -914,11 +915,26 @@ Output: [] (2) CreateViewCommand Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView -(3) UnresolvedRelation -Arguments: [explain_temp1], [], false - -(4) Project -Arguments: ['key, 'val] +(3) LogicalRelation +Arguments: parquet, [key#x, val#x], CatalogTable( +Database: default +Table: explain_temp1 +Created Time [not included in comparison] +Last Access [not included in comparison] +Created By [not included in comparison] +Type: MANAGED +Provider: PARQUET +Location [not included in comparison]/{warehouse_dir}/explain_temp1 +Schema: root +-- key: integer (nullable = true) +-- val: integer (nullable = true) +), false + +(4) SubqueryAlias +Arguments: spark_catalog.default.explain_temp1 + +(5) Project +Arguments: [key#x, val#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index fb43da8b5b604..a72a5f0a2aa86 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -849,8 +849,9 @@ struct == Physical Plan == Execute CreateViewCommand (1) +- CreateViewCommand (2) - +- Project (4) - +- UnresolvedRelation (3) + +- Project (5) + +- SubqueryAlias (4) + +- LogicalRelation (3) (1) Execute CreateViewCommand @@ -859,11 +860,26 @@ Output: [] (2) CreateViewCommand Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView -(3) UnresolvedRelation -Arguments: [explain_temp1], [], false - -(4) Project -Arguments: ['key, 'val] +(3) LogicalRelation +Arguments: parquet, [key#x, val#x], CatalogTable( +Database: default +Table: explain_temp1 +Created Time [not included in comparison] +Last Access [not included in comparison] +Created By [not included in comparison] +Type: MANAGED +Provider: PARQUET +Location [not included in comparison]/{warehouse_dir}/explain_temp1 +Schema: root +-- key: integer (nullable = true) +-- val: integer (nullable = true) +), false + +(4) SubqueryAlias +Arguments: spark_catalog.default.explain_temp1 + +(5) Project +Arguments: [key#x, val#x] -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 533428f9504b1..2f57298856fb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1992,7 +1992,7 @@ class DataSourceV2SQLSuite test("CREATE VIEW") { val v = "testcat.ns1.ns2.v" val e = intercept[AnalysisException] { - sql(s"CREATE VIEW $v AS SELECT * FROM tab1") + sql(s"CREATE VIEW $v AS SELECT 1") } assert(e.message.contains("CREATE VIEW is only supported with v1 tables")) }