Skip to content

Commit

Permalink
[SPARK-6076][Block Manager] Fix a potential OOM issue when StorageLev…
Browse files Browse the repository at this point in the history
…el is MEMORY_AND_DISK_SER

In https://github.com/apache/spark/blob/dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L538 , when StorageLevel is `MEMORY_AND_DISK_SER`, it will copy the content from file into memory, then put it into MemoryStore.
```scala
              val copyForMemory = ByteBuffer.allocate(bytes.limit)
              copyForMemory.put(bytes)
              memoryStore.putBytes(blockId, copyForMemory, level)
              bytes.rewind()
```
However, if the file is bigger than the free memory, OOM will happen. A better approach is testing if there is enough memory. If not, copyForMemory should not be created, since this is an optional operation.

Author: zsxwing <[email protected]>

Closes apache#4827 from zsxwing/SPARK-6076 and squashes the following commits:

7d25545 [zsxwing] Add alias for tryToPut and dropFromMemory
1100a54 [zsxwing] Replace call-by-name with () => T
0cc0257 [zsxwing] Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER
  • Loading branch information
zsxwing authored and Andrew Or committed Mar 25, 2015
1 parent 968408b commit 883b7e9
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 15 deletions.
23 changes: 18 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,14 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
Expand Down Expand Up @@ -991,15 +996,23 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}

def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
dropFromMemory(blockId, () => data)
}

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
*
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand All @@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data match {
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
Expand Down
43 changes: 37 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*
* The caller should guarantee that `size` is correct.
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
val data =
if (putAttempt.success) {
assert(bytes.limit == size)
Right(bytes.duplicate())
} else {
null
}
PutResult(size, data, putAttempt.droppedBlocks)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
Expand Down Expand Up @@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId.asRDDId.map(_.rddId)
}

private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
tryToPut(blockId, () => value, size, deserialized)
}

/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
* created to avoid OOM since it may be a big ByteBuffer.
*
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
Expand All @@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private def tryToPut(
blockId: BlockId,
value: Any,
value: () => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

Expand All @@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks ++= freeSpaceResult.droppedBlocks

if (enoughFreeSpace) {
val entry = new MemoryEntry(value, size, deserialized)
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
Expand All @@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(master.getLocations("a3").size === 0, "master was told about a3")

// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
Expand Down Expand Up @@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
t2.join()
t3.join()

store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
Expand Down Expand Up @@ -1223,4 +1223,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
val result = memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
assert(result.size === 13000)
assert(result.data === null)
assert(result.droppedBlocks === Nil)
}

test("put a small ByteBuffer to MemoryStore") {
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
var bytes: ByteBuffer = null
val result = memoryStore.putBytes(blockId, 10000, () => {
bytes = ByteBuffer.allocate(10000)
bytes
})
assert(result.size === 10000)
assert(result.data === Right(bytes))
assert(result.droppedBlocks === Nil)
}
}

0 comments on commit 883b7e9

Please sign in to comment.