Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19868] conflict TasksetManager lead to spark stopped #17208

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
Expand All @@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.mockito.Matchers.{anyInt, anyString}
import org.mockito.Matchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer

import org.apache.spark._
import org.apache.spark.internal.config
Expand Down Expand Up @@ -1056,6 +1058,29 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.isZombie)
}


test("SPARK-19868: DagScheduler only notified of taskEnd when state is ready") {
// dagScheduler.taskEnded() is async, so it may *seem* ok to call it before we've set all
// appropriate state, eg. isZombie. However, this sets up a race that could go the wrong way.
// This is a super-focused regression test which checks the zombie state as soon as
// dagScheduler.taskEnded() is called, to ensure we haven't introduced a race.
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val mockDAGScheduler = mock(classOf[DAGScheduler])
sched.dagScheduler = mockDAGScheduler
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).then(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
assert(manager.isZombie === true)
}
})
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
manager.handleSuccessfulTask(0, createTaskResult(0))
}

test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
Expand Down