Skip to content

Commit

Permalink
Unit tests for concurrent stages issue
Browse files Browse the repository at this point in the history
  • Loading branch information
kayousterhout authored and squito committed Jun 10, 2015
1 parent 6e14683 commit 883fe49
Showing 1 changed file with 134 additions and 0 deletions.
134 changes: 134 additions & 0 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,15 @@ class DAGSchedulerSuite
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
val submittedStageInfos = new HashSet[StageInfo]
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
stageByOrderOfExecution += stageInfo.stageId
Expand Down Expand Up @@ -150,6 +156,7 @@ class DAGSchedulerSuite
// Enable local execution for this test
val conf = new SparkConf().set("spark.localExecution.enabled", "true")
sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
Expand Down Expand Up @@ -547,6 +554,133 @@ class DAGSchedulerSuite
assert(sparkListener.failedStages.size == 1)
}

/** This tests the case where another FetchFailed comes in while the map stage is getting
* re-run. */
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
}

// The map stage should have been submitted.
assert(countSubmittedMapStageAttempts() === 1)

complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))

// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
createFakeTaskInfo(),
null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1))

// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))

// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)

// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
createFakeTaskInfo(),
null))

// Another ResubmitFailedStages event should not result result in another attempt for the map
// stage being run concurrently.
runEvent(ResubmitFailedStages)
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(countSubmittedMapStageAttempts() === 2)

// NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't effect anything --
// our calling it just makes *SURE* it gets called between the desired event and our check.

}

/** This tests the case where a late FetchFailed comes in after the map stage has finished getting
* retried and a new reduce stage starts running.
*/
test("extremely late fetch failures don't cause multiple concurrent attempts for the same stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

def countSubmittedReduceStageAttempts(): Int = {
sparkListener.submittedStageInfos.count(_.stageId == 1)
}
def countSubmittedMapStageAttempts(): Int = {
sparkListener.submittedStageInfos.count(_.stageId == 0)
}

// The map stage should have been submitted.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(countSubmittedMapStageAttempts() === 1)

// Complete the map stage.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))

// The reduce stage should have been submitted.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(countSubmittedReduceStageAttempts() === 1)

// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null,
Map[Long, Any](),
createFakeTaskInfo(),
null))

// Trigger resubmission of the failed map stage and finish the re-started map task.
runEvent(ResubmitFailedStages)
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))

// Because the map stage finished, another attempt for the reduce stage should have been
// submitted, resulting in 2 total attempts for each the map and the reduce stage.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(countSubmittedMapStageAttempts() === 2)
assert(countSubmittedReduceStageAttempts() === 2)

// A late FetchFailed arrives from the second task in the original reduce stage.
runEvent(CompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
null,
Map[Long, Any](),
createFakeTaskInfo(),
null))

// Trigger resubmission of the failed map stage and finish the re-started map task.
runEvent(ResubmitFailedStages)

// The FetchFailed from the original reduce stage should be ignored.
assert(countSubmittedMapStageAttempts() === 2)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand Down

0 comments on commit 883fe49

Please sign in to comment.