Skip to content

Commit

Permalink
Use one config, instead of two.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 27, 2019
1 parent 369f32c commit 6fffa9d
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 18 deletions.
19 changes: 6 additions & 13 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -783,23 +783,16 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS =
ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocs")
.internal()
.doc("Whether to cache preferred locations of checkpointed RDD. Caching preferred " +
"locations can relieve query loading to DFS and save the query time. The drawback " +
"is that the cached locations can be possibly outdated and lose data locality. " +
"This location cache expires by `spark.rdd.checkpoint.cachePreferredLocsExpireTime` " +
"minutes.")
.booleanConf
.createWithDefault(false)

private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME =
ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocsExpireTime")
.internal()
.doc("Expire time in minutes for caching preferred locations of checkpointed RDD.")
.doc("Expire time in minutes for caching preferred locations of checkpointed RDD." +
"Caching preferred locations can relieve query loading to DFS and save the query " +
"time. The drawback is that the cached locations can be possibly outdated and " +
"lose data locality. If this config is not specified or is 0, it will not cache.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(60)
.checkValue(_ > 0, "The expire time for caching preferred locations cannot be non-positive.")
.createOptional

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand Down Expand Up @@ -87,7 +87,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
// Cache of preferred locations of checkpointed files.
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME),
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
TimeUnit.MINUTES)
.build(
new CacheLoader[Partition, Seq[String]]() {
Expand All @@ -108,7 +108,8 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
* Return the locations of the checkpoint file associated with the given partition.
*/
protected override def getPreferredLocations(split: Partition): Seq[String] = {
if (SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS)) {
val cachedExpireTime = SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME)
if (cachedExpireTime.isDefined && cachedExpireTime.get > 0) {
cachedPreferredLocations.get(split)
} else {
getPartitionBlockLocations(split)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.reflect.ClassTag
import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS
import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd._
Expand Down Expand Up @@ -623,7 +623,7 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
test("cache checkpoint preferred location") {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set(CACHE_CHECKPOINT_PREFERRED_LOCS.key, "true")
.set(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME.key, "10")
.set(UI_ENABLED.key, "false")
sc = new SparkContext("local", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
Expand Down

0 comments on commit 6fffa9d

Please sign in to comment.