From ee24f7cf660829ff0b392876583b8322c97ef77d Mon Sep 17 00:00:00 2001 From: sandeep-katta Date: Mon, 4 Feb 2019 20:13:22 -0800 Subject: [PATCH] [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value ## What changes were proposed in this pull request? **updateAndSyncNumExecutorsTarget** API should be called after **initializing** flag is unset ## How was this patch tested? Added UT and also manually tested After Fix ![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png) Closes #23697 from sandeep-katta/executorIssue. Authored-by: sandeep-katta Signed-off-by: Sean Owen (cherry picked from commit 1dd7419702c5bc7e36fee9fa1eec06b66f25806e) Signed-off-by: Sean Owen --- .../spark/ExecutorAllocationManager.scala | 4 +-- .../ExecutorAllocationManagerSuite.scala | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c3e5b96a55884..49fa80ca3fcd0 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -306,8 +306,6 @@ private[spark] class ExecutorAllocationManager( private def schedule(): Unit = synchronized { val now = clock.getTimeMillis - updateAndSyncNumExecutorsTarget(now) - val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime @@ -317,6 +315,8 @@ private[spark] class ExecutorAllocationManager( } !expired } + // Update executor target number only after initializing flag is unset + updateAndSyncNumExecutorsTarget(now) if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5c718cb654ce8..f50ad78054009 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -935,12 +935,7 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 0) schedule(manager) - // Verify executor is timeout but numExecutorsTarget is not recalculated - assert(numExecutorsTarget(manager) === 3) - - // Schedule again to recalculate the numExecutorsTarget after executor is timeout - schedule(manager) - // Verify that current number of executors should be ramp down when executor is timeout + // Verify executor is timeout,numExecutorsTarget is recalculated assert(numExecutorsTarget(manager) === 2) } @@ -1147,6 +1142,25 @@ class ExecutorAllocationManagerSuite verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false) } + test("SPARK-26758 check executor target number after idle time out ") { + sc = createSparkContext(1, 5, 3) + val manager = sc.executorAllocationManager.get + val clock = new ManualClock(10000L) + manager.setClock(clock) + assert(numExecutorsTarget(manager) === 3) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty))) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty))) + // make all the executors as idle, so that it will be killed + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + // once the schedule is run target executor number should be 1 + assert(numExecutorsTarget(manager) === 1) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5,