diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala index dbe0bda61589c..c8f397609a0b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -19,9 +19,7 @@ package org.apache.spark.storage import java.util.concurrent.ConcurrentHashMap -private[storage] trait BlockInfo { - def level: StorageLevel - def tellMaster: Boolean +private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { // To save space, 'pending' and 'failed' are encoded as special sizes: @volatile var size: Long = BlockInfo.BLOCK_PENDING private def pending: Boolean = size == BlockInfo.BLOCK_PENDING @@ -81,17 +79,3 @@ private object BlockInfo { private val BLOCK_PENDING: Long = -1L private val BLOCK_FAILED: Long = -2L } - -// All shuffle blocks have the same `level` and `tellMaster` properties, -// so we can save space by not storing them in each instance: -private[storage] class ShuffleBlockInfo extends BlockInfo { - // These need to be defined using 'def' instead of 'val' in order for - // the compiler to eliminate the fields: - def level: StorageLevel = StorageLevel.DISK_ONLY - def tellMaster: Boolean = false -} - -private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean) - extends BlockInfo { - // Intentionally left blank -} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index fbedfbc446021..a34c95b6f07b6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -465,13 +465,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) - writer.registerCloseEventHandler(() => { - val myInfo = new ShuffleBlockInfo() - blockInfo.put(blockId, myInfo) - myInfo.markReady(writer.fileSegment().length) - }) - writer + new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) } /** @@ -501,7 +495,7 @@ private[spark] class BlockManager( // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val myInfo = { - val tinfo = new BlockInfoImpl(level, tellMaster) + val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index e49c191c70a11..469e68fed74bb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} */ abstract class BlockObjectWriter(val blockId: BlockId) { - var closeEventHandler: () => Unit = _ - def open(): BlockObjectWriter - def close() { - closeEventHandler() - } + def close() def isOpen: Boolean - def registerCloseEventHandler(handler: () => Unit) { - closeEventHandler = handler - } - /** * Flush the partial writes and commit them as a single atomic block. Return the * number of bytes written for this commit. @@ -146,8 +138,6 @@ class DiskBlockObjectWriter( ts = null objOut = null } - // Invoke the close callback handler. - super.close() } override def isOpen: Boolean = objOut != null