Skip to content

Commit

Permalink
[SPARK-49502][CORE] Avoid NPE in SparkEnv.get.shuffleManager.unregist…
Browse files Browse the repository at this point in the history
…erShuffle

### What changes were proposed in this pull request?
This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`.

### Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.

```
24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47977 from cxzl25/SPARK-49502.

Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
cxzl25 authored and attilapiros committed Oct 4, 2024
1 parent 52c52a5 commit bb43a4a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class SparkEnv (

// We initialize the ShuffleManager later in SparkContext and Executor to allow
// user jars to define custom ShuffleManagers.
private var _shuffleManager: ShuffleManager = _
@volatile private var _shuffleManager: ShuffleManager = _

def shuffleManager: ShuffleManager = _shuffleManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint(
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
val shuffleManager = SparkEnv.get.shuffleManager
if (shuffleManager != null) {
shuffleManager.unregisterShuffle(shuffleId)
} else {
logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}")
true
}
}

case DecommissionBlockManager =>
Expand Down

0 comments on commit bb43a4a

Please sign in to comment.