Skip to content

Commit

Permalink
SPARK-1145: Memory mapping with many small blocks can cause JVM alloc…
Browse files Browse the repository at this point in the history
…ation failures

This includes some minor code clean-up as well. The main change is that small files are not memory mapped. There is a nicer way to write that code block using Scala's `Try` but to make it easy to back port and as simple as possible, I opted for the more explicit but less pretty format.

Author: Patrick Wendell <[email protected]>

Closes #43 from pwendell/block-iter-logging and squashes the following commits:

1cff512 [Patrick Wendell] Small issue from merge.
49f6c26 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into block-iter-logging
4943351 [Patrick Wendell] Added a test and feedback on mateis review
a637a18 [Patrick Wendell] Review feedback and adding rewind() when reading byte buffers.
b76b95f [Patrick Wendell] Review feedback
4e1514e [Patrick Wendell] Don't memory map for small files
d238b88 [Patrick Wendell] Some logging and clean-up
(cherry picked from commit 6b3c6e5)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
pwendell committed Apr 28, 2014
1 parent 99285d0 commit 2f24159
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ object BlockFetcherIterator {
}

protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
Expand All @@ -159,11 +165,6 @@ object BlockFetcherIterator {
_numBlocksToFetch += localBlocksToFetch.size
} else {
numRemote += blockInfos.size
// Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(BlockId, Long)]
Expand All @@ -178,11 +179,12 @@ object BlockFetcherIterator {
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
if (curRequestSize >= minRequestSize) {
if (curRequestSize >= targetRequestSize) {
// Add this FetchRequest
remoteRequests += new FetchRequest(address, curBlocks)
curRequestSize = 0
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
}
}
// Add in the final request
Expand All @@ -191,7 +193,7 @@ object BlockFetcherIterator {
}
}
}
logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
remoteRequests
}
Expand Down Expand Up @@ -226,8 +228,8 @@ object BlockFetcherIterator {
sendRequest(fetchRequests.dequeue())
}

val numGets = remoteRequests.size - fetchRequests.size
logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

// Get Local Blocks
startTime = System.currentTimeMillis
Expand Down Expand Up @@ -327,7 +329,7 @@ object BlockFetcherIterator {
}

copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
Utils.getUsedTimeMs(startTime))

// Get Local Blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ private[spark] class BlockManager(
val master: BlockManagerMaster,
val defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
val _conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker)
extends Logging {

def conf = _conf
val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {

val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)

override def getSize(blockId: BlockId): Long = {
diskManager.getBlockLocation(blockId).length
}
Expand Down Expand Up @@ -94,12 +96,20 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val segment = diskManager.getBlockLocation(blockId)
val channel = new RandomAccessFile(segment.file, "r").getChannel()
val buffer = try {
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)

try {
// For small files, directly read rather than memory map
if (segment.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(segment.length.toInt)
channel.read(buf, segment.offset)
buf.flip()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
}
} finally {
channel.close()
}
Some(buffer)
}

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,7 @@ private[spark] object Utils extends Logging {
}

/**
* Return the string to tell how long has passed in seconds. The passing parameter should be in
* millisecond.
* Return the string to tell how long has passed in milliseconds.
*/
def getUsedTimeMs(startTimeMs: Long): String = {
" " + (System.currentTimeMillis - startTimeMs) + " ms"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.storage

import java.nio.ByteBuffer
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays

import akka.actor._
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
Expand Down Expand Up @@ -785,6 +788,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}

test("reads of memory-mapped and non memory-mapped files are equivalent") {
val confKey = "spark.storage.memoryMapThreshold"

// Create a non-trivial (not all zeros) byte array
var counter = 0.toByte
def incr = {counter = (counter + 1).toByte; counter;}
val bytes = Array.fill[Byte](1000)(incr)
val byteBuffer = ByteBuffer.wrap(bytes)

val blockId = BlockId("rdd_1_2")

// This sequence of mocks makes these tests fairly brittle. It would
// be nice to refactor classes involved in disk storage in a way that
// allows for easier testing.
val blockManager = mock(classOf[BlockManager])
val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
when(shuffleBlockManager.conf).thenReturn(conf)
val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("java.io.tmpdir"))

when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val mapped = diskStoreMapped.getBytes(blockId).get

when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString))
val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
val notMapped = diskStoreNotMapped.getBytes(blockId).get

// Not possible to do isInstanceOf due to visibility of HeapByteBuffer
assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
"Expected HeapByteBuffer for un-mapped read")
assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")

def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
val array = new Array[Byte](in.remaining())
in.get(array)
array
}

val mappedAsArray = arrayFromByteBuffer(mapped)
val notMappedAsArray = arrayFromByteBuffer(notMapped)
assert(Arrays.equals(mappedAsArray, bytes))
assert(Arrays.equals(notMappedAsArray, bytes))
}

test("updated block statuses") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr>
<td>spark.storage.memoryMapThreshold</td>
<td>8192</td>
<td>
Size of a block, in bytes, above which Spark memory maps when reading a block from disk.
This prevents Spark from memory mapping very small blocks. In general, memory
mapping has high overhead for blocks close to or below the page size of the operating system.
</td>
</tr>
<tr>
<td>spark.tachyonStore.baseDir</td>
<td>System.getProperty("java.io.tmpdir")</td>
Expand Down

0 comments on commit 2f24159

Please sign in to comment.