Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19263] DAGScheduler should avoid sending conflicting task set. #16620

Closed
wants to merge 20 commits into from
Closed
22 changes: 20 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1181,15 +1181,33 @@ class DAGScheduler(

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
// This task was for the currently running attempt of the stage. Since the task
// completed successfully from the perspective of the TaskSetManager, mark it as
// no longer pending (the TaskSetManager may consider the task complete even
// when the output needs to be ignored because the task's epoch is too small below.
// In this case, when pending partitions is empty, there will still be missing
// output locations, which will cause the DAGScheduler to resubmit the stage below.)
shuffleStage.pendingPartitions -= task.partitionId
}
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
// Remove the task's partition from pending partitions. This may have already been
// done above, but will not have been done yet in cases where the task attempt was
// from an earlier attempt of the stage (i.e., not the attempt that's currently
// running). This allows the DAGScheduler to mark the stage as complete when one
// copy of each task has finished successfully, even if the currently active stage
// still has tasks running.
shuffleStage.pendingPartitions -= task.partitionId
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
Expand All @@ -1213,7 +1231,7 @@ class DAGScheduler(
clearCacheLocs()

if (!shuffleStage.isAvailable) {
// Some tasks had failed; let's resubmit this shuffleStage
// Some tasks had failed; let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
val details: String = callSite.longForm

/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}

test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
" even with late completions from earlier stage attempts") {
// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId

val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

submit(rddC, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostA", 2))))

// Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
// and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
result = null))

// Both original tasks in rddA should be marked as failed, because they ran on the
// failed hostA, so both should be resubmitted. Complete them on hostB successfully.
scheduler.resubmitFailedStages()
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
&& taskSets(2).tasks.size === 2)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostB", 2)),
(Success, makeMapStatus("hostB", 2))))

// Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
// successfully. The success should be ignored because the task started before the
// executor failed, so the output may have been lost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))

// Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
// Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
// Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
// is still running.
assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
&& taskSets(3).tasks.size === 2)
runEvent(makeCompletionEvent(
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))

// There should be no new attempt of stage submitted,
// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
// the current attempt (and hasn't completed successfully in any earlier attempts).
assert(taskSets.size === 4)

// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
runEvent(makeCompletionEvent(
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))

// Now the ResultStage should be submitted, because all of the tasks of rddB have
// completed successfully on alive executors.
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
complete(taskSets(4), Seq(
(Success, 1),
(Success, 1)))
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down