Skip to content

Commit

Permalink
[SPARK-21444] Be more defensive when removing broadcasts in MapOutput…
Browse files Browse the repository at this point in the history
…Tracker

## What changes were proposed in this pull request?

In SPARK-21444, sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver.

This is a bug introduced by apache#17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out.

The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`.

## How was this patch tested?

I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools.

Author: Josh Rosen <[email protected]>

Closes apache#18662 from JoshRosen/SPARK-21444.
  • Loading branch information
JoshRosen committed Jul 18, 2017
1 parent e9faae1 commit 5952ad2
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ private class ShuffleStatus(numPartitions: Int) {
*/
def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {
if (cachedSerializedBroadcast != null) {
cachedSerializedBroadcast.destroy()
// Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)
Utils.tryLogNonFatalError {
// Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
// RPCs to dead executors.
cachedSerializedBroadcast.destroy(blocking = false)
}
cachedSerializedBroadcast = null
}
cachedSerializedMapStatus = null
Expand Down

0 comments on commit 5952ad2

Please sign in to comment.