-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29182][Core] Cache preferred locations of checkpointed RDD #25856
Conversation
Test build #111019 has finished for PR 25856 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like huge. Besides ALS
, could you give us some example which gets this benefits?
It reduces the time on huge union from few hours to dozens of minutes.
This issue is not limited to ALS so this change is not specified to ALS. Actually it is common usage to checkpoint data in Spark, to increase reliability and cut RDD linage. Spark operations on the checkpointed data, will be beneficial. |
Thanks. Could you add that into the PR description Section |
.internal() | ||
.doc("Whether to cache preferred locations of checkpointed RDD.") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the side-effect of this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can not think of possible side-effect for now. To be conservative, we can have false as default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the side-effect should be, when the cached preferred locations are outdated, the tasks launched for the partitions might be at non optimal hosts.
@@ -82,14 +83,28 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( | |||
Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i)) | |||
} | |||
|
|||
// Cache of preferred locations of checkpointed files. | |||
private[spark] val cachedPreferredLocations: mutable.HashMap[Int, Seq[String]] = | |||
mutable.HashMap.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a side-effect, this will be the usual suspect. I'm wondering if we need a Caching policies via GuavaCache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once checkpointed to DFS, I think the locations of checkpointed files should not be changed during job execution. This cache is not unlimited in size, as the number RDD files are fixed. But, of course, as this is core, we should be more careful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was the size constraint instead of the time expiration. How large do you expect the size of this cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getPreferredLocations only takes the hostnames from one BlockLocation for each partition. The number of hostnames depends on the block replicas on DFS. So if block replicas is 3, we cache 3 hostname strings for each partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following assumption sounds weak to me. HDFS NN also returns the locations based on the data nodes situation and data nodes can die at any point of time. If a data node with replica dies, HDFS is going to recover it and returns different locations (including existing ones). This PR seems to imply Spark will have always a outdated(corrupted) set of host names. How do you think about that?
I think the locations of checkpointed files should not be changed during job execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, Spark not only chooses hosts from preferred locations. It is just a preferred list, Spark still can launch task if the list of hosts are unavailable.
If a preferred location is present, the task will be added into pending task set for the host. But it is also added into a set of all tasks.
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Lines 238 to 251 in b29829e
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index | |
if (resolveRacks) { | |
sched.getRackForHost(loc.host).foreach { rack => | |
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index | |
} | |
} | |
} | |
if (tasks(index).preferredLocations == Nil) { | |
pendingTaskSetToAddTo.noPrefs += index | |
} | |
pendingTaskSetToAddTo.all += index |
The task in the set of all tasks can still be dequeued and scheduled:
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Lines 358 to 362 in b29829e
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { | |
dequeue(pendingTaskSetToUse.all).foreach { index => | |
return Some((index, TaskLocality.ANY, speculative)) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, Got it. I missed that point. This is only for task scheduling instead of data access.
So, the problem is narrowed down to that Spark job will lose data locality completely, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is the worst case the cached preferred locations are outdated at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, we have two options.
- Document the limitation in the PR description and
config
document and continue in this way. - Using Guava cache with timeout policy to mitigate the possibility.
For the size, it looks okay according to your estimation. How do you want to proceed this? And, I agree with your that the benefit of this PR can be greater than the cons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun for your review and suggestion!
I think Guava cache can enable more control on caching behavior like expire time. I would use option 2 for now.
Test build #111258 has finished for PR 25856 at commit
|
retest this please |
Test build #111264 has finished for PR 25856 at commit
|
@dongjoon-hyun Using Guava cache for now. I also updated the config document and add an expire time config to the cache. |
Test build #111310 has finished for PR 25856 at commit
|
Hi, @cloud-fan . Could you review this PR? |
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Test build #111464 has finished for PR 25856 at commit
|
Since this aims to improve ALS, cc @srowen, too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a significant change and I hesitate to add a new config. Yes, this may mean the preferred locations are 'wrong' sometimes. What's the impact of that, simply loss of locality? I'm trying to get a better sense of whether that's rare or common. What would cause the right answer to change -- data got cached on a different node?
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have one optional config instead of two? like, if the time isn't specified or is 0, don't cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good idea. We can just have one config, instead of two.
*/ | ||
protected override def getPreferredLocations(split: Partition): Seq[String] = { | ||
// Cache of preferred locations of checkpointed files. | ||
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How big can this get -- would it be significant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have given an estimate in previous comment. For one partition, we will cache the number of string (hostnames) as same as the number of relicas on DFS. I think it is not significant.
I have discussed this with @dongjoon-hyun. I think the impact is loss of locality. Preferred locations are used on finding hosts in scheduling tasks. The tasks will be still scheduled and run when the preferred locations are not correct. For example, when Spark executors are not on the same cluster as DFS cluster, we can not schedule tasks on the reported block locations too. |
Is there any guess about how often the correct location would change? is it quite exceptional, or common? |
I do not have statistic number for this. The correct location changes when blocks on DFS are corrupted and DFS tries to create new replicas on new host. I guess it is not say quite exceptional, but it should not be frequently happened. This is just my guess, of course. |
That's my guess too, OK. If so then the downside is quite small. Although it's a sort of significant change, I think it's plausible. I'd like to ping others for a view, like, eh, @vanzin or @cloud-fan ? |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
retest this please |
Test build #111514 has finished for PR 25856 at commit
|
Test build #111535 has finished for PR 25856 at commit
|
gentle ping @vanzin or @cloud-fan for a review. Thanks. |
@cloud-fan Can you take a look of this? Thanks. |
This is stuck for a while. @cloud-fan or @vanzin might be too busy to be unable for reviewing this. @srowen Who else you think can review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm somewhat neutral on it, because it has some benefit in a few cases but I'm just worried about unforeseen impact, although I don't know of a problem case beyond the one already identified. Let me also ask @squito or anyone else he thinks wants to review? would just like one more set of eyes, though I'm not against this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm (other than one minor comment). Idea makes sense, and totally agree this only affects locality preferences for scheduling, not correctness when actually reading the data.
I'm surprised you'd want to use checkpointing for something which reads the same RDD repeatedly like ALS, wouldn't you want to use spark's own persistence? even if it doesn't fit in memory, I'd expect local-disk persistence to be significantly faster than reading from hdfs.
You could arguably make the same optimization in other places that read from hdfs, eg. HadoopRDD, though I suppose repeated scans of the same dataset are less common in that case?
core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
Outdated
Show resolved
Hide resolved
@squito Thanks for review! Because we sample input RDD before running ALS, the sampled RDD becomes nondeterministic (SPARK-29042). We need to checkpoint it to make it deterministic in case of retries happen.
Yes, I think so. In case of repeated scans, I think users will use persist. In this case, persisted dataset will not query block locations. I also quickly checked HadoopRDD. Its locality info is come from InputSplit (InputSplitWithLocationInfo). So I guess for same HadoopRDD, the InputSplits are reused in repeated scans. We may not re-query data locality info. (Not pretty sure but just guess from quickly scanning the related code.) |
Will merge this tomorrow if not more comments. Thanks. |
Test build #112076 has finished for PR 25856 at commit
|
Thanks @dongjoon-hyun @srowen @squito! |
.doc("Expire time in minutes for caching preferred locations of checkpointed RDD." + | ||
"Caching preferred locations can relieve query loading to DFS and save the query " + | ||
"time. The drawback is that the cached locations can be possibly outdated and " + | ||
"lose data locality. If this config is not specified or is 0, it will not cache.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following checkValue already disallows 0, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, good catch.
…ed RDD ### What changes were proposed in this pull request? This is a followup to #25856. This fixes the document about the config value of spark.rdd.checkpoint.cachePreferredLocsExpireTime. ### Why are the changes needed? The document is not correct. spark.rdd.checkpoint.cachePreferredLocsExpireTime can not be 0. ### Does this PR introduce any user-facing change? No ### How was this patch tested? This is document only change. Closes #26251 from viirya/SPARK-29182-followup. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This proposes to add a Spark config to control the caching behavior of ReliableCheckpointRDD.getPreferredLocations. If it is enabled, getPreferredLocations will only compute preferred locations once and cache it for later usage.
The drawback of caching the preferred locations is that when the cached locations are outdated, and lose data locality. It was documented in config document. To mitigate this, this patch also adds a config to set up expire time (default is 60 mins) for the cache. If time expires, the cache will be invalid and it needs to query updated location info.
This adds a test case. Looks like the most suitable test suite is CheckpointCompressionSuite. So this renames CheckpointCompressionSuite to CheckpointStorageSuite and put the test case into.
Why are the changes needed?
One Spark job in our cluster fits many ALS models in parallel. The fitting goes well, but in next when we union all factors, the union operation is very slow.
By looking into the driver stack dump, looks like the driver spends a lot of time on computing preferred locations. As we checkpoint training data before fitting ALS, the time is spent on ReliableCheckpointRDD.getPreferredLocations. In this method, it will call DFS interface to query file status and block locations. As we have big number of partitions derived from the checkpointed RDD, the union will spend a lot of time on querying the same information.
It reduces the time on huge union from few hours to dozens of minutes.
This issue is not limited to ALS so this change is not specified to ALS. Actually it is common usage to checkpoint data in Spark, to increase reliability and cut RDD linage. Spark operations on the checkpointed data, will be beneficial.
Does this PR introduce any user-facing change?
Yes. This adds a Spark config users can use to control the cache behavior of preferred locations of checkpointed RDD.
How was this patch tested?
Unit test added and manual test on development cluster.