Skip to content

Commit

Permalink
Move log message.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 27, 2014
1 parent 49282b3 commit effb1ce
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1046,22 +1046,22 @@ class DAGScheduler(

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " +
s"due to a fetch failure from $mapStage (${mapStage.name}")
}

val mapStage = shuffleToMapStage(shuffleId)
if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
// TODO: Cancel running tasks in the stage
logInfo(s"Marking $failedStage (${failedStage.name}) for resubmision " +
s"due to a fetch failure from $mapStage (${mapStage.name}")
import env.actorSystem.dispatcher
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
Expand Down

0 comments on commit effb1ce

Please sign in to comment.