Skip to content

Commit

Permalink
[SPARK-3224] FetchFailed reduce stages should only show up once in fa…
Browse files Browse the repository at this point in the history
…iled stages (in UI)

This is a HOTFIX for 1.1.

Author: Reynold Xin <[email protected]>
Author: Kay Ousterhout <[email protected]>

Closes apache#2127 from rxin/SPARK-3224 and squashes the following commits:

effb1ce [Reynold Xin] Move log message.
49282b3 [Reynold Xin] Kay's feedback.
3f01847 [Reynold Xin] Merge pull request #2 from kayousterhout/SPARK-3224
796d282 [Kay Ousterhout] Added unit test for SPARK-3224
3d3d356 [Reynold Xin] Remove map output loc even for repeated FetchFaileds.
1dd3eb5 [Reynold Xin] [SPARK-3224] FetchFailed reduce stages should only show up once in the failed stages UI.
(cherry picked from commit bf71905)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
rxin authored and pwendell committed Aug 27, 2014
1 parent 7726e56 commit 2381e90
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
32 changes: 20 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1045,31 +1045,39 @@ class DAGScheduler(
stage.pendingTasks += task

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)

// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
}
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")

if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
// TODO: Cancel running tasks in the stage
import env.actorSystem.dispatcher
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
s"$failedStage (${failedStage.name}) due to fetch failure")
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls

import akka.actor._
Expand Down Expand Up @@ -98,7 +98,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
val successfulStages = new HashSet[Int]()
val failedStages = new HashSet[Int]()
val failedStages = new ArrayBuffer[Int]()
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
if (stageInfo.failureReason.isEmpty) {
Expand Down Expand Up @@ -435,6 +435,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

test("trivial shuffle with multiple fetch failures") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))

// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
null,
Map[Long, Any](),
null,
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
null,
Map[Long, Any](),
null,
null))
// The SparkListener should not receive redundant failure events.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.size == 1)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand Down

0 comments on commit 2381e90

Please sign in to comment.