Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34699][SQL] 'CREATE OR REPLACE TEMP VIEW USING' should uncache correctly #31825

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.execution.command.ViewHelper.createTemporaryViewRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -90,12 +91,30 @@ case class CreateTempViewUsing(
options = options)

val catalog = sparkSession.sessionState.catalog
val viewDefinition = Dataset.ofRows(
val analyzedPlan = Dataset.ofRows(
sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan

if (global) {
val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(tableIdent.table, Option(db))
val viewDefinition = createTemporaryViewRelation(
viewIdent,
sparkSession,
replace,
catalog.getRawGlobalTempView,
originalText = None,
analyzedPlan,
aliasedPlan = analyzedPlan)
catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
} else {
val viewDefinition = createTemporaryViewRelation(
tableIdent,
sparkSession,
replace,
catalog.getRawTempView,
originalText = None,
analyzedPlan,
aliasedPlan = analyzedPlan)
catalog.createTempView(tableIdent.table, viewDefinition, replace)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Created Time [not included in comparison]
Last Access [not included in comparison]
Created By [not included in comparison]
Type: VIEW
Table Properties: [view.storingAnalyzedPlan=true]
Schema: root
|-- e: integer (nullable = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,4 +1509,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(spark.sharedState.cacheManager.lookupCachedData(sql("SELECT 1")).isEmpty)
}
}

test("SPARK-34699: CREATE TEMP VIEW USING should uncache correctly") {
withTempView("tv") {
testCreateTemporaryViewUsingWithCache(TableIdentifier("tv"))
}
}

test("SPARK-34699: CREATE GLOBAL TEMP VIEW USING should uncache correctly") {
withGlobalTempView("global_tv") {
val db = spark.sharedState.globalTempViewManager.database
testCreateTemporaryViewUsingWithCache(TableIdentifier("global_tv", Some(db)))
}
}

private def testCreateTemporaryViewUsingWithCache(ident: TableIdentifier): Unit = {
withTempDir { dir =>
val path1 = new File(dir, "t1").getCanonicalPath
val path2 = new File(dir, "t2").getCanonicalPath
Seq(1).toDF.write.parquet(path1)
Seq(1).toDF.write.parquet(path2)

val (tempViewStr, viewName) = if (ident.database.nonEmpty) {
("GLOBAL TEMPORARY VIEW", s"${ident.database.get}.${ident.table}")
} else {
("TEMPORARY VIEW", ident.table)
}

sql(s"CREATE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")

sql(s"CACHE TABLE $viewName")
assert(spark.catalog.isCached(viewName))

// Replacing with the same relation. The cache shouldn't be uncached.
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
assert(spark.catalog.isCached(viewName))

// Replacing with a different relation. The cache should be cleared.
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path2')")
assert(!spark.catalog.isCached(viewName))

// Validate that the cache is cleared by creating a temp view with the same relation.
sql(s"CREATE OR REPLACE $tempViewStr ${ident.table} USING parquet OPTIONS (path '$path1')")
assert(!spark.catalog.isCached(viewName))
}
}
}