Skip to content

Commit

Permalink
Remove map output loc even for repeated FetchFaileds.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 26, 2014
1 parent 1dd3eb5 commit 3d3d356
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ class DAGScheduler(

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the executor.
Expand All @@ -1056,13 +1057,6 @@ class DAGScheduler(
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")

// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")
if (failedStages.isEmpty && eventProcessActor != null) {
Expand All @@ -1076,6 +1070,13 @@ class DAGScheduler(
failedStages += failedStage
failedStages += mapStage
}

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
Expand Down

0 comments on commit 3d3d356

Please sign in to comment.