Skip to content

Commit

Permalink
[SPARK-49217][CORE] Support separate buffer size configuration in Uns…
Browse files Browse the repository at this point in the history
…afeShuffleWriter

### What changes were proposed in this pull request?
This PR aims to support separate buffer size configuration in UnsafeShuffleWriter.

Introduce `spark.shuffle.file.merge.buffer` configuration.

### Why are the changes needed?

`UnsafeShuffleWriter#mergeSpillsWithFileStream` uses `spark.shuffle.file.buffer` as the buffer for reading spill files, and this buffer is an off-heap buffer.

In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, `UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of off-heap memory, which makes the executor easily killed by YARN.

https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment verification

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47733 from cxzl25/SPARK-49217.

Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
cxzl25 authored and Mridul Muralidharan committed Aug 23, 2024
1 parent 70c9b94 commit d84f1a3
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;
private final int initialSortBufferSize;
private final int inputBufferSizeInBytes;
private final int mergeBufferSizeInBytes;

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
Expand Down Expand Up @@ -140,8 +140,8 @@ public UnsafeShuffleWriter(
this.transferToEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_MERGE_PREFER_NIO());
this.initialSortBufferSize =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
this.inputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.mergeBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_MERGE_BUFFER_SIZE()) * 1024;
open();
}

Expand Down Expand Up @@ -372,7 +372,7 @@ private void mergeSpillsWithFileStream(
for (int i = 0; i < spills.length; i++) {
spillInputStreams[i] = new NioBufferedFileInputStream(
spills[i].file,
inputBufferSizeInBytes);
mergeBufferSizeInBytes);
// Only convert the partitionLengths when debug level is enabled.
if (logger.isDebugEnabled()) {
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,14 @@ package object config {
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_FILE_MERGE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.merge.buffer")
.doc("Size of the in-memory buffer for each shuffle file input stream, in KiB unless " +
"otherwise specified. These buffers use off-heap buffers and are related to the number " +
"of files in the shuffle file. Too large buffers should be avoided.")
.version("4.0.0")
.fallbackConf(SHUFFLE_FILE_BUFFER_SIZE)

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)")
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,16 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>1.4.0</td>
</tr>
<tr>
<td><code>spark.shuffle.file.merge.buffer</code></td>
<td>32k</td>
<td>
Size of the in-memory buffer for each shuffle file input stream, in KiB unless otherwise
specified. These buffers use off-heap buffers and are related to the number of files in
the shuffle file. Too large buffers should be avoided.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
<td>32k</td>
Expand Down

0 comments on commit d84f1a3

Please sign in to comment.