Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Mar 7, 2021
1 parent 12af3a7 commit eeb3712
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import scala.collection.mutable
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
Expand Down Expand Up @@ -115,48 +116,27 @@ case class CreateViewCommand(

if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
name,
sparkSession,
replace,
catalog.getRawTempView(name.table),
originalText,
aliasedPlan,
analyzedPlan)
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
if (replace && needsToUncache(catalog.getRawGlobalTempView(name.table), aliasedPlan)) {
logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
Some(aliasedPlan))
}
val tableDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
replace,
catalog.getRawGlobalTempView(name.table),
originalText,
aliasedPlan,
analyzedPlan)
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
val tableMetadata = catalog.getTableMetadata(name)
Expand Down Expand Up @@ -192,20 +172,6 @@ case class CreateViewCommand(
Seq.empty[Row]
}

/**
* Checks if need to uncache the temp view being replaced.
*/
private def needsToUncache(
rawTempView: Option[LogicalPlan],
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// The temp view doesn't exist, no need to uncache.
case None => false
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan)
case Some(p) => !p.sameResult(aliasedPlan)
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
Expand Down Expand Up @@ -284,14 +250,21 @@ case class AlterViewAsCommand(
}

private def alterTemporaryView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
val tableDefinition = if (conf.storeAnalyzedPlanForView) {
analyzedPlan
val catalog = session.sessionState.catalog
val rawTempView = if (name.database.isEmpty) {
catalog.getRawTempView(name.table)
} else {
checkCyclicViewReference(analyzedPlan, Seq(name), name)
TemporaryViewRelation(
prepareTemporaryView(
name, session, analyzedPlan, analyzedPlan.schema, Some(originalText)))
catalog.getRawGlobalTempView(name.table)
}
assert(rawTempView.isDefined)
val tableDefinition = createTemporaryViewRelation(
name,
session,
replace = false,
rawTempView = rawTempView,
originalText = Some(originalText),
aliasedPlan = analyzedPlan,
analyzedPlan = analyzedPlan)
session.sessionState.catalog.alterTempViewDefinition(name, tableDefinition)
}

Expand Down Expand Up @@ -345,7 +318,7 @@ case class ShowViewsCommand(
}
}

object ViewHelper {
object ViewHelper extends SQLConfHelper with Logging {

private val configPrefixDenyList = Seq(
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
Expand Down Expand Up @@ -592,19 +565,62 @@ object ViewHelper {
(collectTempViews(child), collectTempFunctions(child))
}

def createTemporaryViewRelation(
name: TableIdentifier,
session: SparkSession,
replace: Boolean,
rawTempView: Option[LogicalPlan],
originalText: Option[String],
aliasedPlan: LogicalPlan,
analyzedPlan: LogicalPlan): TemporaryViewRelation = {
val needsToUncache = needsToUncacheTempView(rawTempView, aliasedPlan)
if (replace && needsToUncache) {
logInfo(s"Try to uncache ${name.quotedString} before replacing.")
checkCyclicViewReference(analyzedPlan, Seq(name), name)
CommandUtils.uncacheTableOrView(session, name.quotedString)
}
if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
session,
analyzedPlan,
aliasedPlan.schema,
originalText.get))
} else {
TemporaryViewRelation(
prepareTemporaryViewFromDataFrame(name, aliasedPlan),
Some(aliasedPlan))
}
}

/**
* Checks if we need to uncache the temp view being replaced by checking if the raw temp
* view will return the same result as the given aliased plan.
*/
private def needsToUncacheTempView(
rawTempView: Option[LogicalPlan],
aliasedPlan: LogicalPlan): Boolean = rawTempView match {
// The temp view doesn't exist, no need to uncache.
case None => false
// Do not need to uncache if the to-be-replaced temp view plan and the new plan are the
// same-result plans.
case Some(TemporaryViewRelation(_, Some(p))) => !p.sameResult(aliasedPlan)
case Some(p) => !p.sameResult(aliasedPlan)
}

/**
* Returns a [[CatalogTable]] that contains information for temporary view.
* Generate the view-specific properties(e.g. view default database, view query output
* column names) and store them as properties in the CatalogTable, and also creates
* the proper schema for the view.
*/
def prepareTemporaryView(
private def prepareTemporaryView(
viewName: TableIdentifier,
session: SparkSession,
analyzedPlan: LogicalPlan,
viewSchema: StructType,
originalText: Option[String]): CatalogTable = {
originalText: String): CatalogTable = {

val catalog = session.sessionState.catalog
val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, analyzedPlan)
Expand All @@ -618,15 +634,15 @@ object ViewHelper {
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = viewSchema,
viewText = originalText,
viewText = Some(originalText),
properties = newProperties)
}

/**
* Returns a [[CatalogTable]] that contains information for the temporary view created
* from a dataframe.
*/
def prepareTemporaryViewFromDataFrame(
private def prepareTemporaryViewFromDataFrame(
viewName: TableIdentifier,
analyzedPlan: LogicalPlan): CatalogTable = {
CatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,4 +909,15 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}
}

test("terry") {
withTempView("v1") {
sql("CREATE TEMPORARY VIEW tv1 AS SELECT 1")
sql("CACHE TABLE tv1")
assert(spark.catalog.isCached("tv1"))
sql("ALTER VIEW tv1 as SELECT 2")
assert(!spark.catalog.isCached("tv1"))
sql("select * from tv1").show
}
}
}

0 comments on commit eeb3712

Please sign in to comment.