Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jinxing committed Feb 14, 2017
1 parent 46ef5a3 commit ab8d13e
Showing 1 changed file with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2162,7 +2162,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}

test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
" even with late completions from earlier stage attempts") {
" even with late completions from earlier stage attempts") {
// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId
Expand All @@ -2174,39 +2175,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

submit(rddC, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostA", 2))))

// Fetch failed on hostA.
// Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
// and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
null))
result = null))

// Both original tasks in rddA should be marked as failed, because they ran on the
// failed hostA, so both should be resubmitted. Complete them on hostB successfully.
scheduler.resubmitFailedStages()

assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
&& taskSets(2).tasks.size === 2)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostB", 2)),
(Success, makeMapStatus("hostB", 2))))

// Task succeeds on a failed executor. The success is bogus.
// Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
// successfully. The success should be ignored because the task started before the
// executor failed, so the output may have been lost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))

assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
// Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
// Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
// Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
// is still running.
assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
&& taskSets(3).tasks.size === 2)
runEvent(makeCompletionEvent(
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))

// There should be no new attempt of stage submitted.
// There should be no new attempt of stage submitted,
// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
// the current attempt (and hasn't completed successfully in any earlier attempts).
assert(taskSets.size === 4)

// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
runEvent(makeCompletionEvent(
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))

// ResultStage submitted.
// Now the ResultStage should be submitted, because all of the tasks of rddB have
// completed successfully on alive executors.
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
complete(taskSets(4), Seq(
(Success, 1),
Expand Down

0 comments on commit ab8d13e

Please sign in to comment.