Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19263] DAGScheduler should avoid sending conflicting task set. #16620

Closed
wants to merge 20 commits into from
Closed
20 changes: 18 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1181,15 +1181,31 @@ class DAGScheduler(

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
// This task was for the currently running attempt of the stage. Since the task
// completed successfully from the perspective of the TaskSetManager, mark it as
// no longer pending (the TaskSetManager may consider the task complete even
// when the output needs to be ignored because the task's epoch is too small below).
shuffleStage.pendingPartitions -= task.partitionId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved. IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage.

}
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
shuffleStage.addOutputLoc(smt.partitionId, status)
// Remove the task's partition from pending partitions. This may have already been
// done above, but will not have been done yet in cases where the task attempt was
// from an earlier attempt of the stage (i.e., not the attempt that's currently
// running). This allows the DAGScheduler to mark the stage as complete when one
// copy of each task has finished successfully, even if the currently active stage
// still has tasks running.
shuffleStage.pendingPartitions -= task.partitionId
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
Expand All @@ -1213,7 +1229,7 @@ class DAGScheduler(
clearCacheLocs()

if (!shuffleStage.isAvailable) {
// Some tasks had failed; let's resubmit this shuffleStage
// Some tasks had failed; let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
val details: String = callSite.longForm

/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}

test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
" even with late completions from earlier stage attempts") {
// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId

val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

submit(rddC, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment this like I suggested in your other PR?

complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostA", 2))))

// Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
// and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
runEvent(makeCompletionEvent(
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
result = null))

// Both original tasks in rddA should be marked as failed, because they ran on the
// failed hostA, so both should be resubmitted. Complete them on hostB successfully.
scheduler.resubmitFailedStages()
assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
&& taskSets(2).tasks.size === 2)
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostB", 2)),
(Success, makeMapStatus("hostB", 2))))

// Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
// successfully. The success should be ignored because the task started before the
// executor failed, so the output may have been lost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))

// Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
// Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
// Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
// is still running.
assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
&& taskSets(3).tasks.size === 2)
runEvent(makeCompletionEvent(
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))

// There should be no new attempt of stage submitted,
// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
// the current attempt (and hasn't completed successfully in any earlier attempts).
assert(taskSets.size === 4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the line that would fail without your change? (just verifying my understanding)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so : )


// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
runEvent(makeCompletionEvent(
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))

// Now the ResultStage should be submitted, because all of the tasks of rddB have
// completed successfully on alive executors.
assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
complete(taskSets(4), Seq(
(Success, 1),
(Success, 1)))
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down