From bb43a4ac12991010fe1566d8546838c0f06c1a2b Mon Sep 17 00:00:00 2001 From: sychen Date: Fri, 6 Sep 2024 10:50:18 -0500 Subject: [PATCH] [SPARK-49502][CORE] Avoid NPE in SparkEnv.get.shuffleManager.unregisterShuffle ### What changes were proposed in this pull request? This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`. ### Why are the changes needed? After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE. ``` 24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None) 24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29 java.lang.NullPointerException at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes #47977 from cxzl25/SPARK-49502. Authored-by: sychen Signed-off-by: Mridul Muralidharan gmail.com> --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/storage/BlockManagerStorageEndpoint.scala | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f8b7cdcf7a8b0..6b7fc1b0804b7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -73,7 +73,7 @@ class SparkEnv ( // We initialize the ShuffleManager later in SparkContext and Executor to allow // user jars to define custom ShuffleManagers. - private var _shuffleManager: ShuffleManager = _ + @volatile private var _shuffleManager: ShuffleManager = _ def shuffleManager: ShuffleManager = _shuffleManager diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 686ac1eb786e0..f29e8778da037 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint( if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } - SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) + val shuffleManager = SparkEnv.get.shuffleManager + if (shuffleManager != null) { + shuffleManager.unregisterShuffle(shuffleId) + } else { + logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}") + true + } } case DecommissionBlockManager =>