Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29182][Core] Cache preferred locations of checkpointed RDD #25856

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,24 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS =
ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocs")
.internal()
.doc("Whether to cache preferred locations of checkpointed RDD. Caching preferred " +
"locations can relieve query loading to DFS and save the query time. The drawback " +
"is that the cached locations can be possibly outdated and lose data locality. " +
"This location cache expires by `spark.rdd.checkpoint.cachePreferredLocsExpireTime` " +
"minutes.")
.booleanConf
.createWithDefault(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the side-effect of this feature?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not think of possible side-effect for now. To be conservative, we can have false as default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the side-effect should be, when the cached preferred locations are outdated, the tasks launched for the partitions might be at non optimal hosts.


private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one optional config instead of two? like, if the time isn't specified or is 0, don't cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good idea. We can just have one config, instead of two.

ConfigBuilder("spark.rdd.checkpoint.cachePreferredLocsExpireTime")
.internal()
.doc("Expire time in minutes for caching preferred locations of checkpointed RDD.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(60)

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("Threshold in bytes above which the size of shuffle blocks in " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ package org.apache.spark.rdd
import java.io.{FileNotFoundException, IOException}
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{BUFFER_SIZE, CHECKPOINT_COMPRESS}
import org.apache.spark.internal.config.{BUFFER_SIZE, CACHE_CHECKPOINT_PREFERRED_LOCS, CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME, CHECKPOINT_COMPRESS}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{SerializableConfiguration, Utils}

Expand Down Expand Up @@ -82,16 +84,37 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
}

/**
* Return the locations of the checkpoint file associated with the given partition.
*/
protected override def getPreferredLocations(split: Partition): Seq[String] = {
// Cache of preferred locations of checkpointed files.
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How big can this get -- would it be significant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given an estimate in previous comment. For one partition, we will cache the number of string (hostnames) as same as the number of relicas on DFS. I think it is not significant.

.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME),
TimeUnit.MINUTES)
.build(
new CacheLoader[Partition, Seq[String]]() {
override def load(split: Partition): Seq[String] = {
getPartitionBlockLocations(split)
}
})

// Returns the block locations of given partition on file system.
private def getPartitionBlockLocations(split: Partition): Seq[String] = {
val status = fs.getFileStatus(
new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index)))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
}

/**
* Return the locations of the checkpoint file associated with the given partition.
*/
protected override def getPreferredLocations(split: Partition): Seq[String] = {
if (SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS)) {
cachedPreferredLocations.get(split)
} else {
getPartitionBlockLocations(split)
}
}

/**
* Read the content of the checkpoint file associated with the given partition.
*/
Expand Down
26 changes: 25 additions & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd._
Expand Down Expand Up @@ -584,7 +585,7 @@ object CheckpointSuite {
}
}

class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved

test("checkpoint compression") {
withTempDir { checkpointDir =>
Expand Down Expand Up @@ -618,4 +619,27 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
assert(rdd.collect().toSeq === (1 to 20))
}
}

test("cache checkpoint preferred location") {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set(CACHE_CHECKPOINT_PREFERRED_LOCS.key, "true")
.set(UI_ENABLED.key, "false")
sc = new SparkContext("local", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.checkpoint()
assert(rdd.collect().toSeq === (1 to 20))

// Verify that RDD is checkpointed
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
val checkpointedRDD = rdd.firstParent.asInstanceOf[ReliableCheckpointRDD[_]]
val partiton = checkpointedRDD.partitions(0)
assert(!checkpointedRDD.cachedPreferredLocations.asMap.containsKey(partiton))

val preferredLoc = checkpointedRDD.preferredLocations(partiton)
assert(checkpointedRDD.cachedPreferredLocations.asMap.containsKey(partiton))
assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
}
}
}