Skip to content

Commit

Permalink
[SPARK-23433][CORE] Late zombie task completions update all tasksets
Browse files Browse the repository at this point in the history
After a fetch failure and stage retry, we may have multiple tasksets
which are active for a given stage.  A late completion from an earlier
attempt of the stage should update the most recent attempt for the
stage, so it does not try to submit another task for the same partition,
and so that it knows when it is completed.
  • Loading branch information
squito committed Apr 23, 2018
1 parent 3990daa commit 0720a7c
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.markPartitionCompleted(partitionId)
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class TaskSetManager(
val ser = env.closureSerializer.newInstance()

val tasks = taskSet.tasks
private[scheduler] val partitionToIndex = tasks.zipWithIndex
.map { case (t, idx) => t.partitionId -> idx }.toMap
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)

Expand Down Expand Up @@ -153,7 +155,7 @@ private[spark] class TaskSetManager(
private[scheduler] val speculatableTasks = new HashSet[Int]

// Task index, start and finish time for each task attempt (indexed by task ID)
private val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -754,6 +756,9 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// There may be multiple tasksets for this stage -- we let all of them know that the partition
// was completed. This may result in some of the tasksets getting completed.
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
Expand All @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}

private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
}
}
}

/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.initialize(new FakeSchedulerBackend)
}
}

test("Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()

def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
val indexInTsm = tsm.partitionToIndex(partition)
val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
}

// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
// two times, so we have three active task sets for one stage. (For this to really happen,
// you'd need the previous stage to also get restarted, and then succeed, in between each
// attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offers)
assert(tsm.runningTasks === 10)
if (stageAttempt < 2) {
// fail attempt
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
// the attempt is a zombie, but the tasks are still running (this could be true even if
// we actively killed those tasks, as killing is best-effort)
assert(tsm.isZombie)
assert(tsm.runningTasks === 9)
}
tsm
}

// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
// the stage, but this time with insufficient resources so not all tasks are active.

val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
finalAttempt.tasks(task.index).partitionId
}.toSet
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)

// We simulate late completions from our zombie tasksets, corresponding to all the pending
// partitions in our final attempt. This means we're only waiting on the tasks we've already
// launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
}

// If there is another resource offer, we shouldn't run anything. Though our final attempt
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)

val allTaskSets = zombieAttempts ++ Seq(finalTsm)
val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)

// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
// marked as zombie.
// for each of the remaining tasks, find the tasksets with an active copy of the task, and
// finish the task.
remainingTasks.foreach { partition =>
val tsm = if (partition == 0) {
// we failed this task on both zombie attempts, this one is only present in the latest
// taskset
finalTsm
} else {
// should be active in every taskset. We choose a zombie taskset just to make sure that
// we transition the active taskset correctly even if the final completion comes
// from a zombie.
zombieAttempts(partition % 2)
}
completeTaskSuccessfully(tsm, partition)
}

assert(finalTsm.isZombie)

// no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())

// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
// else succeeds, to make sure we get the right updates to the blacklist in all cases.
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
val stageAttempt = tsm.taskSet.stageAttemptId
tsm.runningTasksSet.foreach { index =>
if (stageAttempt == 1) {
tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
} else {
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
}
}

// we update the blacklist for the stage attempts with all successful tasks. Even though
// some tasksets had failures, we still consider them all successful from a blacklisting
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
}
}
}

0 comments on commit 0720a7c

Please sign in to comment.