Skip to content

Commit

Permalink
Add alias for tryToPut and dropFromMemory
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Mar 25, 2015
1 parent 1100a54 commit 7d25545
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,18 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
dropFromMemory(blockId, () => data)
}

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
Expand Down
28 changes: 24 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
} else {
val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}
Expand Down Expand Up @@ -117,11 +117,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
returnValues: Boolean): PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
val putAttempt = tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}
Expand Down Expand Up @@ -320,6 +320,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId.asRDDId.map(_.rddId)
}

/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*
* Return whether put was successful, along with the blocks dropped in the process.
*/
private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
tryToPut(blockId, () => value, size, deserialized)
}

/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
Expand Down Expand Up @@ -439,7 +459,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(master.getLocations("a3").size === 0, "master was told about a3")

// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
Expand Down Expand Up @@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
t2.join()
t3.join()

store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
Expand Down

0 comments on commit 7d25545

Please sign in to comment.