From 76256774c52b78b9f6011f82063004bf18734f01 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 May 2021 12:17:57 +0800 Subject: [PATCH 1/9] refactor --- .../apache/spark/sql/internal/SQLConf.scala | 11 ++++ .../org/apache/spark/sql/SparkSession.scala | 23 ++++--- .../apache/spark/sql/CachedTableSuite.scala | 64 ++++++++++++++++++- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fb43db5ec57dd..c7adc4ec83e6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1090,6 +1090,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_DISABLE_CONFIGS_ENABLED = + buildConf("spark.sql.cache.disableConfigs.enabled") + .internal() + .doc(s"When true, some configs are disabled during executing cache plan that is to avoid " + + s"performance regression if other queries hit the cached plan. Currently, the disabled " + + s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " + + s"${AUTO_BUCKETED_SCAN_ENABLED.key}.") + .version("3.2.0") + .booleanConf + .createWithDefault(true) + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5768c19470741..b8cd3afa5b84c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1069,21 +1069,26 @@ object SparkSession extends Logging { } /** - * Returns a cloned SparkSession with all specified configurations disabled, or - * the original SparkSession if all configurations are already disabled. + * When CACHE_DISABLE_CONFIGS_ENABLED is enabled, returns a cloned SparkSession with all + * specified configurations disabled, or the original SparkSession if all configurations + * are already disabled. */ private[sql] def getOrCloneSessionWithConfigsOff( session: SparkSession, configurations: Seq[ConfigEntry[Boolean]]): SparkSession = { - val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) - if (configsEnabled.isEmpty) { + if (!session.sessionState.conf.getConf(SQLConf.CACHE_DISABLE_CONFIGS_ENABLED)) { session } else { - val newSession = session.cloneSession() - configsEnabled.foreach(conf => { - newSession.sessionState.conf.setConf(conf, false) - }) - newSession + val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) + if (configsEnabled.isEmpty) { + session + } else { + val newSession = session.cloneSession() + configsEnabled.foreach(conf => { + newSession.sessionState.conf.setConf(conf, false) + }) + newSession + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index a98f4d5f49d34..d2b0233df1519 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1175,7 +1175,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } test("cache supports for intervals") { - withTable("interval_cache") { + withTable("interval_cache", "t1") { Seq((1, "1 second"), (2, "2 seconds"), (2, null)) .toDF("k", "v").write.saveAsTable("interval_cache") sql("CACHE TABLE t1 AS SELECT k, cast(v as interval) FROM interval_cache") @@ -1554,4 +1554,66 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(!spark.catalog.isCached(viewName)) } } + + test("SPARK-35332: Make cache plan disable configs configurable - check AQE") { + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + + withTempView("t1", "t2", "t3") { + withCache("t1", "t2", "t3") { + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") + assert(spark.table("t1").rdd.partitions.length == 2) + + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false") + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") + assert(spark.table("t2").rdd.partitions.length == 1) + + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") + assert(spark.table("t3").rdd.partitions.length == 2) + + sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}") + } + } + } + } + + test("SPARK-35332: Make cache plan disable configs configurable - check bucket scan") { + withTable("t1", "t2", "t3") { + Seq(1, 2, 3).foreach { i => + spark.range(1, 2) + .write + .format("parquet") + .bucketBy(2, "id") + .saveAsTable(s"t$i") + } + + withCache("t1", "t2", "t3") { + withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true", + SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql("CACHE TABLE t1") + assert(spark.table("t1").rdd.partitions.length == 2) + + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false") + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2") + assert(spark.table("t2").rdd.partitions.length == 1) + + sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3") + assert(spark.table("t3").rdd.partitions.length == 2) + + sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}") + } + } + } + } } From f0c99db18079fc3bbf9ff4e31074b6774a8d6416 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 11 May 2021 09:14:21 +0800 Subject: [PATCH 2/9] address comment --- .../apache/spark/sql/internal/SQLConf.scala | 6 ++--- .../org/apache/spark/sql/SparkSession.scala | 23 ++++++++----------- .../spark/sql/execution/CacheManager.scala | 17 ++++++++++---- .../apache/spark/sql/CachedTableSuite.scala | 16 ++++++------- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c7adc4ec83e6d..a0c3f48f92757 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1090,11 +1090,11 @@ object SQLConf { .booleanConf .createWithDefault(true) - val CACHE_DISABLE_CONFIGS_ENABLED = - buildConf("spark.sql.cache.disableConfigs.enabled") + val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING = + buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning") .internal() .doc(s"When true, some configs are disabled during executing cache plan that is to avoid " + - s"performance regression if other queries hit the cached plan. Currently, the disabled " + + "performance regression if other queries hit the cached plan. Currently, the disabled " + s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " + s"${AUTO_BUCKETED_SCAN_ENABLED.key}.") .version("3.2.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b8cd3afa5b84c..5768c19470741 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1069,26 +1069,21 @@ object SparkSession extends Logging { } /** - * When CACHE_DISABLE_CONFIGS_ENABLED is enabled, returns a cloned SparkSession with all - * specified configurations disabled, or the original SparkSession if all configurations - * are already disabled. + * Returns a cloned SparkSession with all specified configurations disabled, or + * the original SparkSession if all configurations are already disabled. */ private[sql] def getOrCloneSessionWithConfigsOff( session: SparkSession, configurations: Seq[ConfigEntry[Boolean]]): SparkSession = { - if (!session.sessionState.conf.getConf(SQLConf.CACHE_DISABLE_CONFIGS_ENABLED)) { + val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) + if (configsEnabled.isEmpty) { session } else { - val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) - if (configsEnabled.isEmpty) { - session - } else { - val newSession = session.cloneSession() - configsEnabled.foreach(conf => { - newSession.sessionState.conf.setConf(conf, false) - }) - newSession - } + val newSession = session.cloneSession() + configsEnabled.foreach(conf => { + newSession.sessionState.conf.setConf(conf, false) + }) + newSession } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 0c6f22dea7b45..93aada5cd47af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -114,8 +114,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { - val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff( - spark, forceDisableConfigs) + val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val inMemoryRelation = sessionWithConfigsOff.withActive { val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache) InMemoryRelation( @@ -223,8 +222,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() - val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff( - spark, forceDisableConfigs) + val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val newCache = sessionWithConfigsOff.withActive { val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan) InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) @@ -328,4 +326,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (needToRefresh) fileIndex.refresh() needToRefresh } + + /** + * If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled, just return original session. + */ + private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = { + if (!session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { + session + } else { + SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d2b0233df1519..be493ff189344 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1562,22 +1562,22 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withTempView("t1", "t2", "t3") { withCache("t1", "t2", "t3") { - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") assert(spark.table("t1").rdd.partitions.length == 2) sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") assert(spark.table("t2").rdd.partitions.length == 1) - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") assert(spark.table("t1").rdd.partitions.length == 2) assert(spark.table("t2").rdd.partitions.length == 1) sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") assert(spark.table("t3").rdd.partitions.length == 2) - sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}") + sql(s"RESET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key}") } } } @@ -1596,22 +1596,22 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withCache("t1", "t2", "t3") { withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true", SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") sql("CACHE TABLE t1") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = false") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") assert(spark.table("t1").rdd.partitions.length == 2) sql("CACHE TABLE t2") assert(spark.table("t2").rdd.partitions.length == 1) - sql(s"SET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") assert(spark.table("t1").rdd.partitions.length == 2) assert(spark.table("t2").rdd.partitions.length == 1) sql("CACHE TABLE t3") assert(spark.table("t3").rdd.partitions.length == 2) - sql(s"RESET ${SQLConf.CACHE_DISABLE_CONFIGS_ENABLED.key}") + sql(s"RESET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key}") } } } From 30b0572acef36f79b0f3bf6b7e094eaf0b762e33 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 11 May 2021 09:24:12 +0800 Subject: [PATCH 3/9] refine default value --- .../org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../org/apache/spark/sql/CachedTableSuite.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a0c3f48f92757..ad473209e62c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1093,13 +1093,13 @@ object SQLConf { val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING = buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning") .internal() - .doc(s"When true, some configs are disabled during executing cache plan that is to avoid " + + .doc(s"When false, some configs are disabled during executing cache plan that is to avoid " + "performance regression if other queries hit the cached plan. Currently, the disabled " + s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " + s"${AUTO_BUCKETED_SCAN_ENABLED.key}.") .version("3.2.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index be493ff189344..f964caef3e8aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1562,16 +1562,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withTempView("t1", "t2", "t3") { withCache("t1", "t2", "t3") { - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") assert(spark.table("t1").rdd.partitions.length == 2) sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") assert(spark.table("t2").rdd.partitions.length == 1) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") assert(spark.table("t1").rdd.partitions.length == 2) assert(spark.table("t2").rdd.partitions.length == 1) sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") @@ -1596,16 +1596,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withCache("t1", "t2", "t3") { withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true", SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") sql("CACHE TABLE t1") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") assert(spark.table("t1").rdd.partitions.length == 2) sql("CACHE TABLE t2") assert(spark.table("t2").rdd.partitions.length == 1) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") assert(spark.table("t1").rdd.partitions.length == 2) assert(spark.table("t2").rdd.partitions.length == 1) sql("CACHE TABLE t3") From 708bb0c78256256043b14ecfa7b62a6cb96fea6a Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 11 May 2021 12:05:41 +0800 Subject: [PATCH 4/9] fix --- .../scala/org/apache/spark/sql/execution/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 93aada5cd47af..27a4264fe7a91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -331,7 +331,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { * If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is disabled, just return original session. */ private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = { - if (!session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { + if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { session } else { SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs) From a83e9cfc40cc4b80385e23c180363fe1b5b09d13 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 11 May 2021 16:26:53 +0800 Subject: [PATCH 5/9] wrap withSQLConf --- .../apache/spark/sql/CachedTableSuite.scala | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index f964caef3e8aa..ae987e4890e28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1562,22 +1562,21 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withTempView("t1", "t2", "t3") { withCache("t1", "t2", "t3") { - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") - sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") - assert(spark.table("t1").rdd.partitions.length == 2) - - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") - assert(spark.table("t1").rdd.partitions.length == 2) - sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") - assert(spark.table("t2").rdd.partitions.length == 1) - - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") - assert(spark.table("t3").rdd.partitions.length == 2) - - sql(s"RESET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key}") + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") + assert(spark.table("t1").rdd.partitions.length == 2) + + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") + assert(spark.table("t2").rdd.partitions.length == 1) + + sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") + assert(spark.table("t3").rdd.partitions.length == 2) + } } } } @@ -1595,8 +1594,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withCache("t1", "t2", "t3") { withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "true", - SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") + SQLConf.FILES_MIN_PARTITION_NUM.key -> "1", + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { sql("CACHE TABLE t1") assert(spark.table("t1").rdd.partitions.length == 2) @@ -1610,8 +1609,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(spark.table("t2").rdd.partitions.length == 1) sql("CACHE TABLE t3") assert(spark.table("t3").rdd.partitions.length == 2) - - sql(s"RESET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key}") } } } From 515aeba746164ab652f01107019a656190682a35 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 11 May 2021 16:42:11 +0800 Subject: [PATCH 6/9] fix --- .../apache/spark/sql/CachedTableSuite.scala | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index ae987e4890e28..f14b9ed144b8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1566,16 +1566,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") - assert(spark.table("t1").rdd.partitions.length == 2) - sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") - assert(spark.table("t2").rdd.partitions.length == 1) - - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") - assert(spark.table("t3").rdd.partitions.length == 2) + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") + assert(spark.table("t2").rdd.partitions.length == 1) + + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") + assert(spark.table("t3").rdd.partitions.length == 2) + } + } } } } @@ -1599,16 +1601,18 @@ class CachedTableSuite extends QueryTest with SQLTestUtils sql("CACHE TABLE t1") assert(spark.table("t1").rdd.partitions.length == 2) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = true") - assert(spark.table("t1").rdd.partitions.length == 2) - sql("CACHE TABLE t2") - assert(spark.table("t2").rdd.partitions.length == 1) + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2") + assert(spark.table("t2").rdd.partitions.length == 1) - sql(s"SET ${SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key} = false") - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3") - assert(spark.table("t3").rdd.partitions.length == 2) + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3") + assert(spark.table("t3").rdd.partitions.length == 2) + } + } } } } From 763312f803dc030e47450aab997445153369c6bd Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 12 May 2021 17:58:05 +0800 Subject: [PATCH 7/9] test --- .../apache/spark/sql/CachedTableSuite.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index f14b9ed144b8f..a154de00479ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1561,22 +1561,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("t1", "t2", "t3") { - withCache("t1", "t2", "t3") { - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { - sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") + assert(spark.table("t1").rdd.partitions.length == 2) + + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") + assert(spark.table("t2").rdd.partitions.length == 1) - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { assert(spark.table("t1").rdd.partitions.length == 2) - sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") assert(spark.table("t2").rdd.partitions.length == 1) - - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") - assert(spark.table("t3").rdd.partitions.length == 2) - } + sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") + assert(spark.table("t3").rdd.partitions.length == 2) } } } From 16b8a22cb2d9952b68e2e9a09e56e015ed9781d1 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 12 May 2021 17:58:21 +0800 Subject: [PATCH 8/9] docs --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ad473209e62c0..7e6ed025c5414 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1093,10 +1093,11 @@ object SQLConf { val CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING = buildConf("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning") .internal() - .doc(s"When false, some configs are disabled during executing cache plan that is to avoid " + - "performance regression if other queries hit the cached plan. Currently, the disabled " + - s"configs include: ${ADAPTIVE_EXECUTION_ENABLED.key} and " + - s"${AUTO_BUCKETED_SCAN_ENABLED.key}.") + .doc("Whether to forcibly enable some optimization rules that can change the output " + + "partitioning of a cached query when executing it for caching. If it is set to true, " + + "queries may need an extra shuffle to read the cached data. This configuration is " + + "disabled by default. Currently, the optimization rules enabled by this configuration " + + s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.") .version("3.2.0") .booleanConf .createWithDefault(false) From 2e8492baf2c9b6dba3bd635f2b3a25d288e7f3ab Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 13 May 2021 17:04:00 +0800 Subject: [PATCH 9/9] comment --- .../apache/spark/sql/CachedTableSuite.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index a154de00479ad..3c05a7afc095d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1564,19 +1564,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as t(c)") assert(spark.table("t1").rdd.partitions.length == 2) + } - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { - assert(spark.table("t1").rdd.partitions.length == 2) - sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") - assert(spark.table("t2").rdd.partitions.length == 1) + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + assert(spark.table("t1").rdd.partitions.length == 2) + sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as t(c)") + assert(spark.table("t2").rdd.partitions.length == 1) + } - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") - assert(spark.table("t3").rdd.partitions.length == 2) - } - } + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as t(c)") + assert(spark.table("t3").rdd.partitions.length == 2) } } } @@ -1603,13 +1603,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(spark.table("t1").rdd.partitions.length == 2) sql("CACHE TABLE t2") assert(spark.table("t2").rdd.partitions.length == 1) + } - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { - assert(spark.table("t1").rdd.partitions.length == 2) - assert(spark.table("t2").rdd.partitions.length == 1) - sql("CACHE TABLE t3") - assert(spark.table("t3").rdd.partitions.length == 2) - } + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "false") { + assert(spark.table("t1").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) + sql("CACHE TABLE t3") + assert(spark.table("t3").rdd.partitions.length == 2) } } }