Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Mar 14, 2021
1 parent 86baa36 commit ba22dd0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.execution.command.ViewHelper.createTemporaryViewRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -90,12 +91,30 @@ case class CreateTempViewUsing(
options = options)

val catalog = sparkSession.sessionState.catalog
val viewDefinition = Dataset.ofRows(
val analyzedPlan = Dataset.ofRows(
sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan

if (global) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(tableIdent.table, Option(db))
val viewDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
replace,
catalog.getRawGlobalTempView,
originalText = None,
analyzedPlan,
aliasedPlan = analyzedPlan)
catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
} else {
val viewDefinition = createTemporaryViewRelation(
tableIdent,
sparkSession,
replace,
catalog.getRawTempView,
originalText = None,
analyzedPlan,
aliasedPlan = analyzedPlan)
catalog.createTempView(tableIdent.table, viewDefinition, replace)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,4 +1509,48 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}

test("SPARK-34699: CREATE TEMP VIEW USING should uncache correctly") {
withTempView("tv") {
testCreateTemporaryViewUsingWithCache(TableIdentifier("tv"))
}
}

test("SPARK-34699: CREATE GLOBAL TEMP VIEW USING should uncache correctly") {
withGlobalTempView("global_tv") {
val db = spark.sharedState.globalTempViewManager.database
testCreateTemporaryViewUsingWithCache(TableIdentifier("global_tv", Some(db)))
}
}

private def testCreateTemporaryViewUsingWithCache(ident: TableIdentifier): Unit = {
withTempDir { dir =>
val path1 = new File(dir, "t1").getCanonicalPath
val path2 = new File(dir, "t2").getCanonicalPath
Seq(1).toDF.write.parquet(path1)
Seq(1).toDF.write.parquet(path2)

val (tempViewStr, viewName) = if (ident.database.nonEmpty) {
("GLOBAL TEMPORARY VIEW", s"${ident.database.get}.${ident.table}")
} else {
("TEMPORARY VIEW", ident.table)
}

sql(s"CREATE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")

spark.sharedState.cacheManager.clearCache()

sql(s"CACHE TABLE $viewName")
assert(spark.catalog.isCached(viewName))

// Replacing with the same relation. The cache shouldn't be uncached.
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
assert(spark.catalog.isCached(viewName))

// Replacing with a different relation. The cache should be cleared.
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path2')")
assert(!spark.catalog.isCached(viewName))
assert(spark.sharedState.cacheManager.isEmpty)
}
}
}

0 comments on commit ba22dd0

Please sign in to comment.