From 6fffa9da7e69e124e54c04878758d15dd261f8c9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 27 Sep 2019 13:34:57 -0700 Subject: [PATCH] Use one config, instead of two. --- .../spark/internal/config/package.scala | 19 ++++++------------- .../spark/rdd/ReliableCheckpointRDD.scala | 7 ++++--- .../org/apache/spark/CheckpointSuite.scala | 4 ++-- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ee2f71a1d9d2a..af0e8f3a6b83c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -783,23 +783,16 @@ package object config { .booleanConf .createWithDefault(false) - private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS = - ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocs") - .internal() - .doc("Whether to cache preferred locations of checkpointed RDD. Caching preferred " + - "locations can relieve query loading to DFS and save the query time. The drawback " + - "is that the cached locations can be possibly outdated and lose data locality. " + - "This location cache expires by `spark.rdd.checkpoint.cachePreferredLocsExpireTime` " + - "minutes.") - .booleanConf - .createWithDefault(false) - private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME = ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocsExpireTime") .internal() - .doc("Expire time in minutes for caching preferred locations of checkpointed RDD.") + .doc("Expire time in minutes for caching preferred locations of checkpointed RDD." + + "Caching preferred locations can relieve query loading to DFS and save the query " + + "time. The drawback is that the cached locations can be possibly outdated and " + + "lose data locality. If this config is not specified or is 0, it will not cache.") .timeConf(TimeUnit.MINUTES) - .createWithDefault(60) + .checkValue(_ > 0, "The expire time for caching preferred locations cannot be non-positive.") + .createOptional private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = ConfigBuilder("spark.shuffle.accurateBlockThreshold") diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 684d2ae1701ad..7662d00927b9e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS} +import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -87,7 +87,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( // Cache of preferred locations of checkpointed files. @transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder() .expireAfterWrite( - SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME), + SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get, TimeUnit.MINUTES) .build( new CacheLoader[Partition, Seq[String]]() { @@ -108,7 +108,8 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( * Return the locations of the checkpoint file associated with the given partition. */ protected override def getPreferredLocations(split: Partition): Seq[String] = { - if (SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS)) { + val cachedExpireTime = SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME) + if (cachedExpireTime.isDefined && cachedExpireTime.get > 0) { cachedPreferredLocations.get(split) } else { getPartitionBlockLocations(split) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 9f6d19dd0aef0..6a108a55045ee 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -24,7 +24,7 @@ import scala.reflect.ClassTag import com.google.common.io.ByteStreams import org.apache.hadoop.fs.Path -import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS +import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.rdd._ @@ -623,7 +623,7 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { test("cache checkpoint preferred location") { withTempDir { checkpointDir => val conf = new SparkConf() - .set(CACHE_CHECKPOINT_PREFERRED_LOCS.key, "true") + .set(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME.key, "10") .set(UI_ENABLED.key, "false") sc = new SparkContext("local", "test", conf) sc.setCheckpointDir(checkpointDir.toString)