diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index de7c0d813ae65..aa4e63827bec4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -237,7 +237,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on all executors private def makeOffers() { // Make sure no executor is killed while some task is launching on it - val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { + val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { @@ -263,7 +263,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Make sure no executor is killed while some task is launching on it - val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { + val taskDescs = withLock { // Filter out executors under killing if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) @@ -607,7 +607,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp force: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") - val response = synchronized { + val response = withLock { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) unknownExecutors.foreach { id => logWarning(s"Executor to kill $id does not exist!") @@ -685,6 +685,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None } + + // SPARK-27112: We need to ensure that there is ordering of lock acquisition + // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix + // the deadlock issue exposed in SPARK-27112 + private def withLock[T](fn: => T): T = scheduler.synchronized { + CoarseGrainedSchedulerBackend.this.synchronized { fn } + } } private[spark] object CoarseGrainedSchedulerBackend {