Skip to content

Commit

Permalink
Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Feb 28, 2015
1 parent e0e64ba commit 0cc0257
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,10 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
memoryStore.putBytes(blockId, bytes.limit) {
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
}
bytes.rewind()
}
if (!asBlockResult) {
Expand Down Expand Up @@ -999,7 +1000,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
data: => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*/
def putBytes(blockId: BlockId, size: Long)(_bytes: => ByteBuffer): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes.duplicate().rewind().asInstanceOf[ByteBuffer]
val putAttempt = tryToPut(blockId, bytes, size, deserialized = false)
val data = if (putAttempt.success) Right(bytes.duplicate()) else null
PutResult(size, data, putAttempt.droppedBlocks)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
Expand Down Expand Up @@ -314,7 +326,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private def tryToPut(
blockId: BlockId,
value: Any,
value: => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

Expand Down Expand Up @@ -345,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
lazy val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1221,4 +1221,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
val result = memoryStore.putBytes(blockId, 13000) {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
}
assert(result.size === 13000)
assert(result.data === null)
assert(result.droppedBlocks === Nil)
}

test("put a small ByteBuffer to MemoryStore") {
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
var bytes: ByteBuffer = null
val result = memoryStore.putBytes(blockId, 10000) {
bytes = ByteBuffer.allocate(10000)
bytes
}
assert(result.size === 10000)
assert(result.data === Right(bytes))
assert(result.droppedBlocks === Nil)
}
}

0 comments on commit 0cc0257

Please sign in to comment.