Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1145: Memory mapping with many small blocks can cause JVM allocation failures #43

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ object BlockFetcherIterator {
}

protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes you have atleast 5 nodes - which need not be the general case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was always bothered by this ... not specifically related to this PR btw !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, any reason to move this up ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only change here is to move this up. The reason is that there is no reason to calculate and log this in the inner loop, because its the same for all iterations of that loop. Logging it in the inner loop also means the same message is logged a bunch of times.

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
Expand All @@ -157,11 +163,6 @@ object BlockFetcherIterator {
_numBlocksToFetch += localBlocksToFetch.size
} else {
numRemote += blockInfos.size
// Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
Expand All @@ -176,11 +177,12 @@ object BlockFetcherIterator {
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= minRequestSize) {
if (curRequestSize >= targetRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curRequestSize = 0
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
}
}
// Add in the final request
Expand All @@ -189,7 +191,7 @@ object BlockFetcherIterator {
}
}
}
logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
remoteRequests
}
Expand Down Expand Up @@ -224,8 +226,8 @@ object BlockFetcherIterator {
sendRequest(fetchRequests.dequeue())
}

val numGets = remoteRequests.size - fetchRequests.size
logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

// Get Local Blocks
startTime = System.currentTimeMillis
Expand Down Expand Up @@ -325,7 +327,7 @@ object BlockFetcherIterator {
}

copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
Utils.getUsedTimeMs(startTime))

// Get Local Blocks
Expand Down
24 changes: 18 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {

val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)

override def getSize(blockId: BlockId): Long = {
diskManager.getBlockLocation(blockId).length
}
Expand Down Expand Up @@ -84,12 +86,22 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val segment = diskManager.getBlockLocation(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel()
val buffer = try {
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
} finally {
channel.close()
}
Some(buffer)

val buffer =
try {
// For small files, directly read rather than memory map
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
channel.read(buf, segment.offset)
buf.rewind()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
}
} finally {
channel.close()
}
buffer
}

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,7 @@ private[spark] object Utils extends Logging {
}

/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
* millisecond.
* Return the string to tell how long has passed in milliseconds.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
return " " + (System.currentTimeMillis - startTimeMs) + " ms"
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.storage.memoryMapThreshold</td>
<td>2 * 4096</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just want to write 8192 here

<td>
Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
mapping has high overhead for blocks close to or below the page size of the operating system.
</td>
</tr>
<tr>
<td>spark.mesos.coarse</td>
<td>false</td>
Expand Down