From 3d3d3565f82f3551c2bd7c6eb6013b551179729c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 26 Aug 2014 15:21:02 -0700 Subject: [PATCH] Remove map output loc even for repeated FetchFaileds. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3551e7c18229e..44744a6d4e596 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,6 +1046,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => val failedStage = stageIdToStage(task.stageId) + val mapStage = shuffleToMapStage(shuffleId) // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is possible // the fetch failure has already been handled by the executor. @@ -1056,13 +1057,6 @@ class DAGScheduler( logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") - // Mark the map whose fetch failed as broken in the map stage - val mapStage = shuffleToMapStage(shuffleId) - if (mapId != -1) { - mapStage.removeOutputLoc(mapId, bmAddress) - mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") if (failedStages.isEmpty && eventProcessActor != null) { @@ -1076,6 +1070,13 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage } + + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { + mapStage.removeOutputLoc(mapId, bmAddress) + mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch))