Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42252][CORE] Add spark.shuffle.localDisk.file.output.buffer and deprecate spark.shuffle.unsafe.file.output.buffer #39819

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public LocalDiskShuffleMapOutputWriter(
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputTempFile = null;
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2",
"Please open a JIRA ticket to report it if you need to use this configuration.")
"Please open a JIRA ticket to report it if you need to use this configuration."),
DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0",
"Please use spark.shuffle.localDisk.file.output.buffer")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1463,15 +1463,21 @@ package object config {

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)")
.version("2.3.0")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The buffer size must be positive and less than or equal to" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.localDisk.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in all local disk shuffle writers. In KiB unless otherwise specified.")
.version("4.0.0")
.fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE)

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite {
partitionSizesInMergedFile = null
conf = new SparkConf()
.set("spark.app.id", "example.spark.app")
.set("spark.shuffle.unsafe.file.output.buffer", "16k")
.set("spark.shuffle.localDisk.file.output.buffer", "16k")
when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile)
when(blockResolver.createTempFile(any(classOf[File])))
.thenAnswer { invocationOnMock =>
Expand Down
12 changes: 10 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1031,11 +1031,19 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
<td>32k</td>
<td>
The file system for this buffer size after each partition is written in unsafe shuffle writer.
In KiB unless otherwise specified.
Deprecated since Spark 4.0, please use <code>spark.shuffle.localDisk.file.output.buffer</code>.
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.shuffle.localDisk.file.output.buffer</code></td>
<td>32k</td>
<td>
The file system for this buffer size after each partition is written in all local disk shuffle writers.
In KiB unless otherwise specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.shuffle.spill.diskWriteBufferSize</code></td>
<td>1024 * 1024</td>
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ license: |

- Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`.

- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead.

## Upgrading from Core 3.4 to 3.5

- Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead.
Expand Down