diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a6d5232feb8de..07bdb84cd756f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -128,4 +128,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert(taskDescriptions.map(_.executorId) === Seq("executor0")) } + test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + val dagScheduler = new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + taskScheduler.setDAGScheduler(dagScheduler) + val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null) + val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null) + taskScheduler.submitTasks(attempt1) + intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)} + } + }