Skip to content

Commit

Permalink
Allow configuration of buffer sizes.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Aug 17, 2023
1 parent 0bd8abc commit e28eeab
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 9 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ These configuration values need to be passed to Spark to load and configure the

Changing these values might have an impact on performance.

- `spark.shuffle.s3.bufferSize`: Default size of the buffered output streams (default: `32768`,
uses `spark.shuffle.file.buffer` as default)
- `spark.shuffle.s3.maxBufferSize`: Maximum size of buffered input streams (default: `209715200`,
uses `spark.network.maxRemoteBlockSizeFetchToMem` as default)
- `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`)
- `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`)
- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.apache.spark.shuffle
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE
import org.apache.spark.internal.config.SHUFFLE_FILE_BUFFER_SIZE
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage
import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, ShufflePartitionWriter, WritableByteChannelWrapper}
Expand Down Expand Up @@ -44,9 +44,8 @@ class S3ShuffleMapOutputWriter(

def initStream(): Unit = {
if (stream == null) {
val bufferSize = conf.get(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE).toInt * 1024
stream = dispatcher.createBlock(shuffleBlock)
bufferedStream = new BufferedOutputStream(stream, bufferSize)
bufferedStream = new BufferedOutputStream(stream, dispatcher.bufferSize)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
import org.apache.spark.storage.ShuffleDataBlockId
import org.apache.spark.util.Utils

import java.io.{File, FileInputStream}
import java.io.{BufferedOutputStream, File, FileInputStream}

class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends SingleSpillShuffleMapOutputWriter {

Expand All @@ -23,8 +23,10 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S
): Unit = {
val in = new FileInputStream(mapSpillFile)
val out = dispatcher.createBlock(ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID))
val bufferedOutputStream = new BufferedOutputStream(out, dispatcher.bufferSize)

// Note: HDFS does not exposed a nio-buffered write interface.
Utils.copyStream(in, out, closeStreams = true)
Utils.copyStream(in, bufferedOutputStream, closeStreams = true)

if (dispatcher.checksumEnabled) {
S3ShuffleHelper.writeChecksum(shuffleId, mapId, checksums)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.apache.spark.shuffle.helper

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config.{MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, SHUFFLE_FILE_BUFFER_SIZE}
import org.apache.spark.internal.{Logging, config}
import org.apache.spark.shuffle.ConcurrentObjectMap
import org.apache.spark.storage._
Expand All @@ -33,6 +34,8 @@ class S3ShuffleDispatcher extends Logging {
private val isS3A = rootDir.startsWith("s3a://")

// Optional
val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024)
val maxBufferSize: Int = conf.getInt("spark.shuffle.s3.maxBufferSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt)
val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true)
val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true)
val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)
Expand All @@ -59,6 +62,8 @@ class S3ShuffleDispatcher extends Logging {
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir})")

// Optional
logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}")
logInfo(s"- spark.shuffle.s3.maxBufferSize=${maxBufferSize}")
logInfo(s"- spark.shuffle.s3.cachePartitionLengths=${cachePartitionLengths}")
logInfo(s"- spark.shuffle.s3.cacheChecksums=${cacheChecksums}")
logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object S3ShuffleHelper extends Logging {
def writeArrayAsBlock(blockId: BlockId, array: Array[Long]): Unit = {
val serializerInstance = serializer.newInstance()
val buffer = serializerInstance.serialize[Array[Long]](array)
val file = new BufferedOutputStream(dispatcher.createBlock(blockId))
val file = new BufferedOutputStream(dispatcher.createBlock(blockId), dispatcher.bufferSize)
file.write(buffer.array(), buffer.arrayOffset(), buffer.limit())
file.flush()
file.close()
Expand Down Expand Up @@ -132,7 +132,7 @@ object S3ShuffleHelper extends Logging {
}

private def readBlockAsArray(blockId: BlockId) = {
val file = new BufferedInputStream(dispatcher.openBlock(blockId))
val file = new BufferedInputStream(dispatcher.openBlock(blockId), dispatcher.bufferSize)
var buffer = new Array[Byte](1024)
var numBytes = 0
var done = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class S3ShuffleReader[K, C](

private val dispatcher = S3ShuffleDispatcher.get
private val dep = handle.dependency
private val maxBufferSize = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

private val fetchContinousBlocksInBatch: Boolean = {
val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects
Expand Down Expand Up @@ -106,7 +105,7 @@ class S3ShuffleReader[K, C](
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
Future {
val bufferSize = scala.math.min(wrappedStream.maxBytes, maxBufferSize).toInt
val bufferSize = scala.math.min(wrappedStream.maxBytes, dispatcher.maxBufferSize).toInt
val stream = new BufferedInputStream(wrappedStream, bufferSize)

// Fill the buffered input stream by reading and then resetting the stream.
Expand Down

0 comments on commit e28eeab

Please sign in to comment.