From d238b885d6974283f3b96c26e26693074ff557e6 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 26 Feb 2014 15:44:18 -0800 Subject: [PATCH 1/6] Some logging and clean-up --- .../spark/storage/BlockFetcherIterator.scala | 22 ++++++++++--------- .../scala/org/apache/spark/util/Utils.scala | 3 +-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 925022e7fe6fb..962b79f46c78a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -146,6 +146,12 @@ object BlockFetcherIterator { } protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { + // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them + // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 + // nodes, rather than blocking on reading output from one node. + val maxRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", maxRequestSize: " + maxRequestSize) + // Split local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] @@ -157,11 +163,6 @@ object BlockFetcherIterator { _numBlocksToFetch += localBlocksToFetch.size } else { numRemote += blockInfos.size - // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 - // nodes, rather than blocking on reading output from one node. - val minRequestSize = math.max(maxBytesInFlight / 5, 1L) - logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] @@ -176,11 +177,12 @@ object BlockFetcherIterator { } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } - if (curRequestSize >= minRequestSize) { + if (curRequestSize >= maxRequestSize) { // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curRequestSize = 0 curBlocks = new ArrayBuffer[(BlockId, Long)] + logDebug(s"Creating fetch request of $curRequestSize at $address") } } // Add in the final request @@ -189,7 +191,7 @@ object BlockFetcherIterator { } } } - logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " + + logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + totalBlocks + " blocks") remoteRequests } @@ -224,8 +226,8 @@ object BlockFetcherIterator { sendRequest(fetchRequests.dequeue()) } - val numGets = remoteRequests.size - fetchRequests.size - logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) + val numFetches = remoteRequests.size - fetchRequests.size + logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // Get Local Blocks startTime = System.currentTimeMillis @@ -325,7 +327,7 @@ object BlockFetcherIterator { } copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6)) - logInfo("Started " + fetchRequestsSync.size + " remote gets in " + + logInfo("Started " + fetchRequestsSync.size + " remote fetches in " + Utils.getUsedTimeMs(startTime)) // Get Local Blocks diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8e69f1d3351b5..bd760c8d90335 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -463,8 +463,7 @@ private[spark] object Utils extends Logging { } /** - * Return the string to tell how long has passed in seconds. The passing parameter should be in - * millisecond. + * Return the string to tell how long has passed in milliseconds. */ def getUsedTimeMs(startTimeMs: Long): String = { return " " + (System.currentTimeMillis - startTimeMs) + " ms" From 4e1514e666624d2285b190ff1419a9fae2ae929f Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 26 Feb 2014 18:34:08 -0800 Subject: [PATCH 2/6] Don't memory map for small files --- .../org/apache/spark/storage/DiskStore.scala | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d1f07ddb24bb2..4e0594ab81faf 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -84,12 +84,27 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val segment = diskManager.getBlockLocation(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel() - val buffer = try { - channel.map(MapMode.READ_ONLY, segment.offset, segment.length) - } finally { - channel.close() + + val buffer = + // For small files, directly read rather than memory map + if (segment.length < 2 * 4096) { + val buf = ByteBuffer.allocate(segment.length.toInt) + try { + channel.read(buf, segment.offset) + } + finally { + channel.close() + } + Some(buf) + } else { + val buf = try { + channel.map(MapMode.READ_ONLY, segment.offset, segment.length) + } finally { + channel.close() + } + Some(buf) } - Some(buffer) + buffer } override def getValues(blockId: BlockId): Option[Iterator[Any]] = { From b76b95ff5654523745c1390ab58b1d4dfd366b98 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 23 Apr 2014 01:23:18 -0700 Subject: [PATCH 3/6] Review feedback --- .../org/apache/spark/storage/BlockFetcherIterator.scala | 6 +++--- .../main/scala/org/apache/spark/storage/DiskStore.scala | 4 +++- docs/configuration.md | 9 +++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 962b79f46c78a..09736dfadac54 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -149,8 +149,8 @@ object BlockFetcherIterator { // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 // nodes, rather than blocking on reading output from one node. - val maxRequestSize = math.max(maxBytesInFlight / 5, 1L) - logInfo("maxBytesInFlight: " + maxBytesInFlight + ", maxRequestSize: " + maxRequestSize) + val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) + logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize) // Split local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. @@ -177,7 +177,7 @@ object BlockFetcherIterator { } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } - if (curRequestSize >= maxRequestSize) { + if (curRequestSize >= targetRequestSize) { // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curRequestSize = 0 diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 4e0594ab81faf..aa623edc2f8d7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -33,6 +33,8 @@ import org.apache.spark.util.Utils private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { + val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L) + override def getSize(blockId: BlockId): Long = { diskManager.getBlockLocation(blockId).length } @@ -87,7 +89,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val buffer = // For small files, directly read rather than memory map - if (segment.length < 2 * 4096) { + if (segment.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(segment.length.toInt) try { channel.read(buf, segment.offset) diff --git a/docs/configuration.md b/docs/configuration.md index 8e4c48c81f8be..d123512c41ae9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -122,6 +122,15 @@ Apart from these, the following properties are also available, and may be useful spark.storage.memoryFraction. + + spark.storage.memoryMapThreshold + 2 * 4096 + + Size of a block, in bytes, above which Spark memory maps when reading a block from disk. + This prevents Spark from memory mapping very small blocks. In general, memory + mapping has high overhead for blocks close to or below the page size of the operating system. + + spark.mesos.coarse false From a637a1828c21dc4d5678933e6bef0d07cafdb158 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 23 Apr 2014 14:26:45 -0700 Subject: [PATCH 4/6] Review feedback and adding rewind() when reading byte buffers. --- .../org/apache/spark/storage/DiskStore.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index aa623edc2f8d7..5ff92527b862d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -88,24 +88,19 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val channel = new RandomAccessFile(segment.file, "r").getChannel() val buffer = - // For small files, directly read rather than memory map - if (segment.length < minMemoryMapBytes) { - val buf = ByteBuffer.allocate(segment.length.toInt) - try { + try { + // For small files, directly read rather than memory map + if (segment.length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(segment.length.toInt) channel.read(buf, segment.offset) + buf.rewind() + Some(buf) + } else { + Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) } - finally { - channel.close() - } - Some(buf) - } else { - val buf = try { - channel.map(MapMode.READ_ONLY, segment.offset, segment.length) - } finally { - channel.close() - } - Some(buf) - } + } finally { + channel.close() + } buffer } From 49433511399933abbb91152c7b0ad0c2d8e83ba2 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 27 Apr 2014 12:55:14 -0700 Subject: [PATCH 5/6] Added a test and feedback on mateis review --- .../apache/spark/storage/BlockManager.scala | 3 +- .../org/apache/spark/storage/DiskStore.scala | 26 ++++---- .../spark/storage/BlockManagerSuite.scala | 62 ++++++++++++++++--- docs/configuration.md | 2 +- 4 files changed, 69 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a734ddc1ef702..d49819125fb12 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -41,9 +41,10 @@ private[spark] class BlockManager( val master: BlockManagerMaster, val defaultSerializer: Serializer, maxMemory: Long, - val conf: SparkConf) + _conf: SparkConf) extends Logging { + def conf = _conf val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 5ff92527b862d..74d7b5b82f357 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -87,21 +87,19 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val segment = diskManager.getBlockLocation(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel() - val buffer = - try { - // For small files, directly read rather than memory map - if (segment.length < minMemoryMapBytes) { - val buf = ByteBuffer.allocate(segment.length.toInt) - channel.read(buf, segment.offset) - buf.rewind() - Some(buf) - } else { - Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) - } - } finally { - channel.close() + try { + // For small files, directly read rather than memory map + if (segment.length < minMemoryMapBytes) { + val buf = ByteBuffer.allocate(segment.length.toInt) + channel.read(buf, segment.offset) + buf.flip() + Some(buf) + } else { + Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) } - buffer + } finally { + channel.close() + } } override def getValues(blockId: BlockId): Option[Iterator[Any]] = { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9f011d9c8d132..b4ab99dbd61c5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -17,21 +17,20 @@ package org.apache.spark.storage -import java.nio.ByteBuffer +import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.Arrays import akka.actor._ -import org.scalatest.BeforeAndAfter -import org.scalatest.FunSuite -import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkConf +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import org.mockito.Mockito.{mock, when} +import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} - class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { private val conf = new SparkConf(false) var store: BlockManager = null @@ -657,4 +656,51 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") == None, "a1 should not be in store") } } + + test("reads of memory-mapped and non memory-mapped files are equivalent") { + val confKey = "spark.storage.memoryMapThreshold" + + // Create a non-trivial (not all zeros) byte array + var counter = 0.toByte + def incr = {counter = (counter + 1).toByte; counter;} + val bytes = Array.fill[Byte](1000)(incr) + val byteBuffer = ByteBuffer.wrap(bytes) + + val blockId = BlockId("rdd_1_2") + + // This sequence of mocks makes these tests fairly brittle. It would + // be nice to refactor classes involved in disk storage in a way that + // allows for easier testing. + val blockManager = mock(classOf[BlockManager]) + val shuffleBlockManager = mock(classOf[ShuffleBlockManager]) + when(shuffleBlockManager.conf).thenReturn(conf) + val diskBlockManager = new DiskBlockManager(shuffleBlockManager, + System.getProperty("java.io.tmpdir")) + + when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) + val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) + diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) + val mapped = diskStoreMapped.getBytes(blockId).get + + when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 1000).toString)) + val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager) + diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY) + val notMapped = diskStoreNotMapped.getBytes(blockId).get + + // Not possible to do isInstanceOf due to visibility of HeapByteBuffer + assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"), + "Expected HeapByteBuffer for un-mapped read") + assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read") + + def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = { + val array = new Array[Byte](in.remaining()) + in.get(array) + array + } + + val mappedAsArray = arrayFromByteBuffer(mapped) + val notMappedAsArray = arrayFromByteBuffer(notMapped) + assert(Arrays.equals(mappedAsArray, bytes)) + assert(Arrays.equals(notMappedAsArray, bytes)) + } } diff --git a/docs/configuration.md b/docs/configuration.md index d123512c41ae9..38dde4f8057b0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -124,7 +124,7 @@ Apart from these, the following properties are also available, and may be useful spark.storage.memoryMapThreshold - 2 * 4096 + 8192 Size of a block, in bytes, above which Spark memory maps when reading a block from disk. This prevents Spark from memory mapping very small blocks. In general, memory From 1cff512b5324254e1b49e63419eed0ac76f9f2ee Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 27 Apr 2014 16:22:45 -0700 Subject: [PATCH 6/6] Small issue from merge. --- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6f039260d1376..00deecc1c3ca9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -833,6 +833,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val notMappedAsArray = arrayFromByteBuffer(notMapped) assert(Arrays.equals(mappedAsArray, bytes)) assert(Arrays.equals(notMappedAsArray, bytes)) + } test("updated block statuses") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf,