Skip to content

Commit

Permalink
Merge branch 'SPARK-13931' into 'spark_2.1'
Browse files Browse the repository at this point in the history
[ESPARK-13931] 解决推测导致僵尸任务的问题

解决推测导致僵尸任务的问题  
resolve apache#139

See merge request !84
  • Loading branch information
cenyuhai committed Oct 31, 2017
2 parents 33e0fe8 + 6e83cb6 commit 6c66b6e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,8 @@ private[spark] class TaskSetManager(
// and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.util.Random
import java.util.{Properties, Random}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand All @@ -30,6 +30,7 @@ import org.mockito.stubbing.Answer
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{AccumulatorV2, ManualClock}

class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
Expand Down Expand Up @@ -661,6 +662,67 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
}

test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)

val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
})

// Keep track of the number of tasks that are resubmitted,
// so that the test can check that no tasks were resubmitted.
var resubmittedTasks = 0
val dagScheduler = new FakeDAGScheduler(sc, sched) {
override def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
reason match {
case Resubmitted => resubmittedTasks += 1
case _ =>
}
}
}
sched.setDAGScheduler(dagScheduler)

val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
override def index: Int = 0
}, Seq(TaskLocation("host1", "execA")), new Properties, null)
val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

// Offer host1, which should be accepted as a PROCESS_LOCAL location
// by the one task in the task set
val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get

// Mark the task as available for speculation, and then offer another resource,
// which should be used to launch a speculative copy of the task.
manager.speculatableTasks += singleTask.partitionId
val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get

assert(manager.runningTasks === 2)
assert(manager.isZombie === false)

val directTaskResult = new DirectTaskResult[String](null, Seq()) {
override def value(resultSer: SerializerInstance): String = ""
}
// Complete one copy of the task, which should result in the task set manager
// being marked as a zombie, because at least one copy of its only task has completed.
manager.handleSuccessfulTask(task1.taskId, directTaskResult)
assert(manager.isZombie === true)
assert(resubmittedTasks === 0)
assert(manager.runningTasks === 1)

manager.executorLost("execB", "host2", new SlaveLost())
assert(manager.runningTasks === 0)
assert(resubmittedTasks === 0)
}

test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(
Expand Down

0 comments on commit 6c66b6e

Please sign in to comment.