diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f8b7cdcf7a8b0..6b7fc1b0804b7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -73,7 +73,7 @@ class SparkEnv ( // We initialize the ShuffleManager later in SparkContext and Executor to allow // user jars to define custom ShuffleManagers. - private var _shuffleManager: ShuffleManager = _ + @volatile private var _shuffleManager: ShuffleManager = _ def shuffleManager: ShuffleManager = _shuffleManager diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 686ac1eb786e0..f29e8778da037 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint( if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } - SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) + val shuffleManager = SparkEnv.get.shuffleManager + if (shuffleManager != null) { + shuffleManager.unregisterShuffle(shuffleId) + } else { + logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}") + true + } } case DecommissionBlockManager =>