Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34546][SQL] AlterViewAs.query should be analyzed during the analysis phase, and AlterViewAs should invalidate the cache #31652

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ class Analyzer(override val catalogManager: CatalogManager)

private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
EliminateSubqueryAliases(plan) match {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed because this type of view can also be created not from a dataframe - e.g., ALTER VIEW ... AS with spark.sql.legacy.storeAnalyzedPlanForView set to true.

case other => other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ object UnsupportedOperationChecker extends Logging {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias |
_: TypedFilter) =>
case v: View if v.isDataFrameTempView =>
case v: View if v.isTempViewStoringAnalyzedPlan =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
Expand Down Expand Up @@ -468,7 +468,7 @@ object CatalogTable {
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

val VIEW_CREATED_FROM_DATAFRAME = VIEW_PREFIX + "createdFromDataFrame"
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"

def splitLargeTableProp(
key: String,
Expand Down Expand Up @@ -782,14 +782,14 @@ case class UnresolvedCatalogRelation(

/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis. If the temporary view was
* created from a dataframe, `plan` is set to the analyzed plan for the view.
* and will be transformed to `View` during analysis. If the temporary view is
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_CREATED_FROM_DATAFRAME)))
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))

override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_CREATED_FROM_DATAFRAME
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -462,7 +462,7 @@ case class View(
desc: CatalogTable,
isTempView: Boolean,
child: LogicalPlan) extends UnaryNode {
require(!isDataFrameTempView || child.resolved)
require(!isTempViewStoringAnalyzedPlan || child.resolved)

override def output: Seq[Attribute] = child.output

Expand All @@ -475,8 +475,8 @@ case class View(
case _ => child.canonicalized
}

def isDataFrameTempView: Boolean =
isTempView && desc.properties.contains(VIEW_CREATED_FROM_DATAFRAME)
def isTempViewStoringAnalyzedPlan: Boolean =
isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN)

// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
override def children: Seq[LogicalPlan] = child :: query :: Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait AnalysisTest extends PlanTest {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val actualPlan = getAnalyzer.executeAndCheck(inputPlan, new QueryPlanningTracker)
val transformed = actualPlan transformUp {
case v: View if v.isDataFrameTempView => v.child
case v: View if v.isTempViewStoringAnalyzedPlan => v.child
}
comparePlans(transformed, expectedPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}

private def getTempViewRawPlan(plan: Option[LogicalPlan]): Option[LogicalPlan] = plan match {
case Some(v: View) if v.isDataFrameTempView => Some(v.child)
case Some(v: View) if v.isTempViewStoringAnalyzedPlan => Some(v.child)
case other => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)

case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
Expand Down
Loading