diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4d45fd268d2..382a417f9eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -207,14 +207,20 @@ private[spark] class TaskSchedulerImpl( val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) - stageTaskSets(taskSet.stageAttemptId) = manager - val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => - ts.taskSet != taskSet && !ts.isZombie - } - if (conflictingTaskSet) { - throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + - s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") + + // Mark all the existing TaskSetManagers of this stage as zombie, as we are adding a new one. + // This is necessary to handle a corner case. Let's say a stage has 10 partitions and has 2 + // TaskSetManagers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 + // and it completes. TSM2 finishes tasks for partition 1-9, and thinks he is still active + // because partition 10 is not completed yet. However, DAGScheduler gets task completion + // events for all the 10 partitions and thinks the stage is finished. If it's a shuffle stage + // and somehow it has missing map outputs, then DAGScheduler will resubmit it and create a + // TSM3 for it. As a stage can't have more than one active task set managers, we must mark + // TSM2 as zombie (it actually is). + stageTaskSets.foreach { case (_, ts) => + ts.isZombie = true } + stageTaskSets(taskSet.stageAttemptId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9ad3b7fdcf4..006b3e5853b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -201,28 +201,39 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Even if one of the task sets has not-serializable tasks, the other task set should // still be processed without error taskScheduler.submitTasks(FakeTask.createTaskSet(1)) - taskScheduler.submitTasks(taskSet) + val taskSet2 = new TaskSet( + Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 1, 0, 0, null) + taskScheduler.submitTasks(taskSet2) taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } - test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { + test("concurrent attempts for the same stage only have one active taskset") { val taskScheduler = setupScheduler() + def isTasksetZombie(taskset: TaskSet): Boolean = { + taskScheduler.taskSetManagerForAttempt(taskset.stageId, taskset.stageAttemptId).get.isZombie + } + val attempt1 = FakeTask.createTaskSet(1, 0) - val attempt2 = FakeTask.createTaskSet(1, 1) taskScheduler.submitTasks(attempt1) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } + // The first submitted taskset is active + assert(!isTasksetZombie(attempt1)) - // OK to submit multiple if previous attempts are all zombie - taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) - .get.isZombie = true + val attempt2 = FakeTask.createTaskSet(1, 1) taskScheduler.submitTasks(attempt2) + // The first submitted taskset is zombie now + assert(isTasksetZombie(attempt1)) + // The newly submitted taskset is active + assert(!isTasksetZombie(attempt2)) + val attempt3 = FakeTask.createTaskSet(1, 2) - intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } - taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) - .get.isZombie = true taskScheduler.submitTasks(attempt3) - assert(!failedTaskSet) + // The first submitted taskset remains zombie + assert(isTasksetZombie(attempt1)) + // The second submitted taskset is zombie now + assert(isTasksetZombie(attempt2)) + // The newly submitted taskset is active + assert(!isTasksetZombie(attempt3)) } test("don't schedule more tasks after a taskset is zombie") {