Skip to content

Commit

Permalink
Never store shuffle blocks in BlockManager
Browse files Browse the repository at this point in the history
After the BlockId refactor (PR alteryx#114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.
  • Loading branch information
aarondav committed Nov 5, 2013
1 parent 07b3f01 commit 7eaa461
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 36 deletions.
18 changes: 1 addition & 17 deletions core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package org.apache.spark.storage

import java.util.concurrent.ConcurrentHashMap

private[storage] trait BlockInfo {
def level: StorageLevel
def tellMaster: Boolean
private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
// To save space, 'pending' and 'failed' are encoded as special sizes:
@volatile var size: Long = BlockInfo.BLOCK_PENDING
private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
Expand Down Expand Up @@ -81,17 +79,3 @@ private object BlockInfo {
private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
}

// All shuffle blocks have the same `level` and `tellMaster` properties,
// so we can save space by not storing them in each instance:
private[storage] class ShuffleBlockInfo extends BlockInfo {
// These need to be defined using 'def' instead of 'val' in order for
// the compiler to eliminate the fields:
def level: StorageLevel = StorageLevel.DISK_ONLY
def tellMaster: Boolean = false
}

private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
extends BlockInfo {
// Intentionally left blank
}
10 changes: 2 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,13 +465,7 @@ private[spark] class BlockManager(
def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
})
writer
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
}

/**
Expand Down Expand Up @@ -501,7 +495,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
val tinfo = new BlockInfoImpl(level, tellMaster)
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*/
abstract class BlockObjectWriter(val blockId: BlockId) {

var closeEventHandler: () => Unit = _

def open(): BlockObjectWriter

def close() {
closeEventHandler()
}
def close()

def isOpen: Boolean

def registerCloseEventHandler(handler: () => Unit) {
closeEventHandler = handler
}

/**
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
Expand Down Expand Up @@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
ts = null
objOut = null
}
// Invoke the close callback handler.
super.close()
}

override def isOpen: Boolean = objOut != null
Expand Down

0 comments on commit 7eaa461

Please sign in to comment.