diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 382a417f9eb13..e194b79ae9871 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set -import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import org.apache.spark._ @@ -94,9 +94,6 @@ private[spark] class TaskSchedulerImpl( private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] - // Protected by `this` - private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet] - @volatile private var hasReceivedTask = false @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) @@ -245,20 +242,7 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - // only create a BitSet once for a certain stage since we only remove - // that stage when an active TaskSetManager succeed. - stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet) - val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) - // TaskSet got submitted by DAGScheduler may have some already completed - // tasks since DAGScheduler does not always know all the tasks that have - // been completed by other tasksets when completing a stage, so we mark - // those tasks as finished here to avoid launching duplicate tasks, while - // holding the TaskSchedulerImpl lock. - // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()` - stageIdToFinishedPartitions.get(taskSet.stageId).foreach { - finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None)) - } - tsm + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { @@ -855,31 +839,19 @@ private[spark] class TaskSchedulerImpl( } /** - * Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage. + * Marks the task has completed in all TaskSetManagers for the given stage. * * After stage failure and retry, there may be multiple TaskSetManagers for the stage. * If an earlier attempt of a stage completes a task, we should ensure that the later attempts * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. - * And there is also the possibility that the DAGScheduler submits another taskset at the same - * time as we're marking a task completed here -- that taskset would have a task for a partition - * that was already completed. We maintain the set of finished partitions in - * stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset - * is submitted. See SPARK-25250 for more details. - * - * note: this method must be called with a lock on this. */ private[scheduler] def markPartitionCompletedInAllTaskSets( stageId: Int, partitionId: Int, taskInfo: TaskInfo) = { - // if we do not find a BitSet for this stage, which means an active TaskSetManager - // has already succeeded and removed the stage. - stageIdToFinishedPartitions.get(stageId).foreach{ - finishedPartitions => finishedPartitions += partitionId - } taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId, Some(taskInfo)) + tsm.markPartitionCompleted(partitionId, taskInfo) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 71940ee7d803e..6bf60dd8e9dfa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.math.max import scala.util.control.NonFatal @@ -777,11 +777,7 @@ private[spark] class TaskSetManager( // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { - // clean up finished partitions for the stage when the active TaskSetManager succeed - if (!isZombie) { - sched.stageIdToFinishedPartitions -= stageId - isZombie = true - } + isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + @@ -800,21 +796,16 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted( - partitionId: Int, - taskInfo: Option[TaskInfo]): Unit = { + private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { if (speculationEnabled && !isZombie) { - taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) } + successfulTaskDurations.insert(taskInfo.duration) } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { - if (!isZombie) { - sched.stageIdToFinishedPartitions -= stageId - isZombie = true - } + isZombie = true } maybeFinishTaskSet() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 006b3e5853b76..8b60f8a5e31b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1113,7 +1113,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") { + test("Completions in zombie tasksets update status of non-zombie taskset") { val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() val valueSer = SparkEnv.get.serializer.newInstance() @@ -1125,9 +1125,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, - // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this - // to really happen, you'd need the previous stage to also get restarted, and then succeed, - // in between each attempt, but that happens outside what we're mocking here.) + // two times, so we have three active task sets for one stage. (For this to really happen, + // you'd need the previous stage to also get restarted, and then succeed, in between each + // attempt, but that happens outside what we're mocking here.) val zombieAttempts = (0 until 2).map { stageAttempt => val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) taskScheduler.submitTasks(attempt) @@ -1144,33 +1144,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(tsm.runningTasks === 9) tsm } - // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active - // attempt exists in taskScheduler by now. - - // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit. - // This is possible since the behaviour of submitting new attempt and handling successful task - // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop" - // separately. - (0 until 2).foreach { i => - completeTaskSuccessfully(zombieAttempts(i), i + 1) - assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1)) - } - // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread - // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with - // already completed tasks. And this time with insufficient resources so not all tasks are - // active. + // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for + // the stage, but this time with insufficient resources so not all tasks are active. + val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) taskScheduler.submitTasks(finalAttempt) val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get - // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should - // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch - // any duplicate tasks later (SPARK-25250). - (0 until 2).map(_ + 1).foreach { partitionId => - val index = finalTsm.partitionToIndex(partitionId) - assert(finalTsm.successful(index)) - } - val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => finalAttempt.tasks(task.index).partitionId @@ -1178,17 +1158,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(finalTsm.runningTasks === 5) assert(!finalTsm.isZombie) - // We continually simulate late completions from our zombie tasksets(but this time, there's one - // active attempt exists in taskScheduler), corresponding to all the pending partitions in our - // final attempt. This means we're only waiting on the tasks we've already launched. + // We simulate late completions from our zombie tasksets, corresponding to all the pending + // partitions in our final attempt. This means we're only waiting on the tasks we've already + // launched. val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) finalAttemptPendingPartitions.foreach { partition => completeTaskSuccessfully(zombieAttempts(0), partition) - assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition)) } // If there is another resource offer, we shouldn't run anything. Though our final attempt - // used to have pending tasks, now those tasks have been completed by zombie attempts. The + // used to have pending tasks, now those tasks have been completed by zombie attempts. The // remaining tasks to compute are already active in the non-zombie attempt. assert( taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) @@ -1236,7 +1215,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // perspective, as the failures weren't from a problem w/ the tasks themselves. verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) } - assert(taskScheduler.stageIdToFinishedPartitions.isEmpty) } test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {