diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 24d77f88db9..4d45fd268d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} import scala.util.Random import org.apache.spark._ @@ -94,6 +94,9 @@ private[spark] class TaskSchedulerImpl( private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] + // Protected by `this` + private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet] + @volatile private var hasReceivedTask = false @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) @@ -236,7 +239,20 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + // only create a BitSet once for a certain stage since we only remove + // that stage when an active TaskSetManager succeed. + stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet) + val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + // TaskSet got submitted by DAGScheduler may have some already completed + // tasks since DAGScheduler does not always know all the tasks that have + // been completed by other tasksets when completing a stage, so we mark + // those tasks as finished here to avoid launching duplicate tasks, while + // holding the TaskSchedulerImpl lock. + // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()` + stageIdToFinishedPartitions.get(taskSet.stageId).foreach { + finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None)) + } + tsm } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { @@ -833,19 +849,31 @@ private[spark] class TaskSchedulerImpl( } /** - * Marks the task has completed in all TaskSetManagers for the given stage. + * Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage. * * After stage failure and retry, there may be multiple TaskSetManagers for the stage. * If an earlier attempt of a stage completes a task, we should ensure that the later attempts * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. + * And there is also the possibility that the DAGScheduler submits another taskset at the same + * time as we're marking a task completed here -- that taskset would have a task for a partition + * that was already completed. We maintain the set of finished partitions in + * stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset + * is submitted. See SPARK-25250 for more details. + * + * note: this method must be called with a lock on this. */ private[scheduler] def markPartitionCompletedInAllTaskSets( stageId: Int, partitionId: Int, taskInfo: TaskInfo) = { + // if we do not find a BitSet for this stage, which means an active TaskSetManager + // has already succeeded and removed the stage. + stageIdToFinishedPartitions.get(stageId).foreach{ + finishedPartitions => finishedPartitions += partitionId + } taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId, taskInfo) + tsm.markPartitionCompleted(partitionId, Some(taskInfo)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6bf60dd8e9d..71940ee7d80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} import scala.math.max import scala.util.control.NonFatal @@ -777,7 +777,11 @@ private[spark] class TaskSetManager( // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { - isZombie = true + // clean up finished partitions for the stage when the active TaskSetManager succeed + if (!isZombie) { + sched.stageIdToFinishedPartitions -= stageId + isZombie = true + } } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + @@ -796,16 +800,21 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { + private[scheduler] def markPartitionCompleted( + partitionId: Int, + taskInfo: Option[TaskInfo]): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { if (speculationEnabled && !isZombie) { - successfulTaskDurations.insert(taskInfo.duration) + taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) } } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { - isZombie = true + if (!isZombie) { + sched.stageIdToFinishedPartitions -= stageId + isZombie = true + } } maybeFinishTaskSet() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 29172b4664e..9ad3b7fdcf4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - test("Completions in zombie tasksets update status of non-zombie taskset") { + test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") { val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() val valueSer = SparkEnv.get.serializer.newInstance() @@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, - // two times, so we have three active task sets for one stage. (For this to really happen, - // you'd need the previous stage to also get restarted, and then succeed, in between each - // attempt, but that happens outside what we're mocking here.) + // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this + // to really happen, you'd need the previous stage to also get restarted, and then succeed, + // in between each attempt, but that happens outside what we're mocking here.) val zombieAttempts = (0 until 2).map { stageAttempt => val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) taskScheduler.submitTasks(attempt) @@ -1133,13 +1133,33 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(tsm.runningTasks === 9) tsm } + // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active + // attempt exists in taskScheduler by now. + + // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit. + // This is possible since the behaviour of submitting new attempt and handling successful task + // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop" + // separately. + (0 until 2).foreach { i => + completeTaskSuccessfully(zombieAttempts(i), i + 1) + assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1)) + } - // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for - // the stage, but this time with insufficient resources so not all tasks are active. - + // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread + // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with + // already completed tasks. And this time with insufficient resources so not all tasks are + // active. val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) taskScheduler.submitTasks(finalAttempt) val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get + // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should + // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch + // any duplicate tasks later (SPARK-25250). + (0 until 2).map(_ + 1).foreach { partitionId => + val index = finalTsm.partitionToIndex(partitionId) + assert(finalTsm.successful(index)) + } + val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => finalAttempt.tasks(task.index).partitionId @@ -1147,16 +1167,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(finalTsm.runningTasks === 5) assert(!finalTsm.isZombie) - // We simulate late completions from our zombie tasksets, corresponding to all the pending - // partitions in our final attempt. This means we're only waiting on the tasks we've already - // launched. + // We continually simulate late completions from our zombie tasksets(but this time, there's one + // active attempt exists in taskScheduler), corresponding to all the pending partitions in our + // final attempt. This means we're only waiting on the tasks we've already launched. val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) finalAttemptPendingPartitions.foreach { partition => completeTaskSuccessfully(zombieAttempts(0), partition) + assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition)) } // If there is another resource offer, we shouldn't run anything. Though our final attempt - // used to have pending tasks, now those tasks have been completed by zombie attempts. The + // used to have pending tasks, now those tasks have been completed by zombie attempts. The // remaining tasks to compute are already active in the non-zombie attempt. assert( taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) @@ -1204,6 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // perspective, as the failures weren't from a problem w/ the tasks themselves. verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) } + assert(taskScheduler.stageIdToFinishedPartitions.isEmpty) } test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {