Skip to content

Commit

Permalink
[SPARK-46182][CORE] Track lastTaskFinishTime using the exact task f…
Browse files Browse the repository at this point in the history
…inished event

### What changes were proposed in this pull request?

We found a race condition between lastTaskRunningTime and lastShuffleMigrationTime that could lead to a decommissioned executor exit before all the shuffle blocks have been discovered. The issue could lead to immediate task retry right after an executor exit, thus longer query execution time.

To fix the issue, we choose to update the lastTaskRunningTime only when a task updates its status to finished through the StatusUpdate event. This is better than the current approach (which use a thread to check for number of running tasks every second), because in this way we clearly know whether the shuffle block refresh happened after all tasks finished running or not, thus resolved the race condition mentioned above.

### Why are the changes needed?

To fix a race condition that could lead to shuffle data lost, thus longer query execution time.

### How was this patch tested?

This is a very subtle race condition that is hard to write a unit test using current unit test framework. And we are confident the change is low risk. Thus only verify by passing all the existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#44090 from jiangxb1987/SPARK-46182.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 6f112f7)
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
jiangxb1987 authored and dongjoon-hyun committed Dec 4, 2023
1 parent 429afdc commit b8750d5
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.{Failure, Success}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -80,6 +80,10 @@ private[spark] class CoarseGrainedExecutorBackend(

private var decommissioned = false

// Track the last time in ns that at least one task is running. If no task is running and all
// shuffle/RDD data migration are done, the decommissioned executor should exit.
private var lastTaskFinishTime = new AtomicLong(System.nanoTime())

override def onStart(): Unit = {
if (env.conf.get(DECOMMISSION_ENABLED)) {
val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
Expand Down Expand Up @@ -269,6 +273,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
lastTaskFinishTime.set(System.nanoTime())
}
driver match {
case Some(driverRef) => driverRef.send(msg)
Expand Down Expand Up @@ -341,7 +346,6 @@ private[spark] class CoarseGrainedExecutorBackend(

val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s
// This config is internal and only used by unit tests to force an executor
// to hang around for longer when decommissioned.
Expand All @@ -358,7 +362,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
} else {
Expand All @@ -370,12 +374,6 @@ private[spark] class CoarseGrainedExecutorBackend(
}
} else {
logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
// is no running task but the blocks are not done migrating this does not
// move forward.
lastTaskRunningTime = System.nanoTime()
}
Thread.sleep(sleep_time)
}
Expand Down

0 comments on commit b8750d5

Please sign in to comment.