diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 606bb625f5b22..c0b9018c770a0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -74,7 +74,7 @@ public LocalDiskShuffleMapOutputWriter( this.blockResolver = blockResolver; this.bufferSize = (int) (long) sparkConf.get( - package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024; + package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024; this.partitionLengths = new long[numPartitions]; this.outputFile = blockResolver.getDataFile(shuffleId, mapId); this.outputTempFile = null; diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 95955455a9d4b..cfb514913694b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2", - "Please open a JIRA ticket to report it if you need to use this configuration.") + "Please open a JIRA ticket to report it if you need to use this configuration."), + DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0", + "Please use spark.shuffle.localDisk.file.output.buffer") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a7268c6409913..9fcd9ba529c16 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1463,8 +1463,7 @@ package object config { private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.unsafe.file.output.buffer") - .doc("The file system for this buffer size after each partition " + - "is written in unsafe shuffle writer. In KiB unless otherwise specified.") + .doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)") .version("2.3.0") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, @@ -1472,6 +1471,13 @@ package object config { s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") + private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE = + ConfigBuilder("spark.shuffle.localDisk.file.output.buffer") + .doc("The file system for this buffer size after each partition " + + "is written in all local disk shuffle writers. In KiB unless otherwise specified.") + .version("4.0.0") + .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE) + private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize") .doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.") diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala index 3db7527262568..7ab2cb864234f 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala @@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite { partitionSizesInMergedFile = null conf = new SparkConf() .set("spark.app.id", "example.spark.app") - .set("spark.shuffle.unsafe.file.output.buffer", "16k") + .set("spark.shuffle.localDisk.file.output.buffer", "16k") when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile) when(blockResolver.createTempFile(any(classOf[File]))) .thenAnswer { invocationOnMock => diff --git a/docs/configuration.md b/docs/configuration.md index 23443cab2eacc..6833d4e54fd03 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1033,11 +1033,19 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.unsafe.file.output.buffer 32k - The file system for this buffer size after each partition is written in unsafe shuffle writer. - In KiB unless otherwise specified. + Deprecated since Spark 4.0, please use spark.shuffle.localDisk.file.output.buffer. 2.3.0 + + spark.shuffle.localDisk.file.output.buffer + 32k + + The file system for this buffer size after each partition is written in all local disk shuffle writers. + In KiB unless otherwise specified. + + 4.0.0 + spark.shuffle.spill.diskWriteBufferSize 1024 * 1024 diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 1c37fded53ab7..26b0ff32cf5d9 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -50,6 +50,8 @@ license: | - Since Spark 4.0, Spark performs speculative executions less aggressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. +- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. + ## Upgrading from Core 3.4 to 3.5 - Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead.