Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35332][SQL] Make cache plan disable configs configurable #32482

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val CACHE_DISABLE_CONFIGS_ENABLED =
buildConf("spark.sql.cache.disableConfigs.enabled")
.internal()
.doc(s"When true, some configs are disabled during executing cache plan that is to avoid " +
s"performance regression if other queries hit the cached plan. Currently, the disabled " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need s"".

s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " +
s"${AUTO_BUCKETED_SCAN_ENABLED.key}.")
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
.version("3.2.0")
.booleanConf
.createWithDefault(true)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down
23 changes: 14 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1069,21 +1069,26 @@ object SparkSession extends Logging {
}

/**
* Returns a cloned SparkSession with all specified configurations disabled, or
* the original SparkSession if all configurations are already disabled.
* When CACHE_DISABLE_CONFIGS_ENABLED is enabled, returns a cloned SparkSession with all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getOrCloneSessionWithConfigsOff is not claimed to be only used for cache plan. It is a bit logically wrong to have a so called cache disable config here.

* specified configurations disabled, or the original SparkSession if all configurations
* are already disabled.
*/
private[sql] def getOrCloneSessionWithConfigsOff(
session: SparkSession,
configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
if (configsEnabled.isEmpty) {
if (!session.sessionState.conf.getConf(SQLConf.CACHE_DISABLE_CONFIGS_ENABLED)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getOrCloneSessionWithConfigsOff is used for disabling other configs? It is also automatically under control by CACHE_DISABLE_CONFIGS_ENABLED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, it would introduce potential issue in future. Moved to CacheManager.

session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
newSession.sessionState.conf.setConf(conf, false)
})
newSession
val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
if (configsEnabled.isEmpty) {
session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
newSession.sessionState.conf.setConf(conf, false)
})
newSession
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}

test("cache supports for intervals") {
withTable("interval_cache") {
withTable("interval_cache", "t1") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related this pr, but affected the new added test with t1.

Seq((1, "1 second"), (2, "2 seconds"), (2, null))
.toDF("k", "v").write.saveAsTable("interval_cache")
sql("CACHE TABLE t1 AS SELECT k, cast(v as interval) FROM interval_cache")
Expand Down Expand Up @@ -1554,4 +1554,66 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
assert(!spark.catalog.isCached(viewName))
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check AQE") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

withTempView("t1", "t2", "t3") {
withCache("t1", "t2", "t3") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTempView drops caches, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, removed it.

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)")
assert(spark.table("t1").rdd.partitions.length == 2)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false")
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)")
assert(spark.table("t2").rdd.partitions.length == 1)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)")
assert(spark.table("t3").rdd.partitions.length == 2)

sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}")
}
}
}
}

test("SPARK-35332: Make cache plan disable configs configurable - check bucket scan") {
withTable("t1", "t2", "t3") {
Seq(1, 2, 3).foreach { i =>
spark.range(1, 2)
.write
.format("parquet")
.bucketBy(2, "id")
.saveAsTable(s"t$i")
}

withCache("t1", "t2", "t3") {
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true",
SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") {
sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
sql("CACHE TABLE t1")
assert(spark.table("t1").rdd.partitions.length == 2)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false")
assert(spark.table("t1").rdd.partitions.length == 2)
sql("CACHE TABLE t2")
assert(spark.table("t2").rdd.partitions.length == 1)

sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true")
assert(spark.table("t1").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
sql("CACHE TABLE t3")
assert(spark.table("t3").rdd.partitions.length == 2)

sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}")
}
}
}
}
}