Skip to content

Commit

Permalink
solve review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed May 1, 2015
1 parent ffb8e00 commit a54132c
Show file tree
Hide file tree
Showing 21 changed files with 302 additions and 257 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

// Generate the random name for a temp folder in OffHeap
// Generate the random name for a temp folder in external block store
// Add a timestamp as the suffix here to make it more safe
val extBlkFolderName = "spark-" + randomUUID.toString()
@deprecated("Use extBlkFolderName instead.", "1.4.0")
val tachyonFolderName = extBlkFolderName
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
val tachyonFolderName = externalBlockStoreFolderName

def isLocal: Boolean = (master == "local" || master.startsWith("local["))

Expand Down Expand Up @@ -388,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

_conf.set("spark.extBlkStore.folderName", extBlkFolderName)
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** ExtBlkStore failed to initialize after many attempts. */
val ExtBlk_STORE_FAILED_TO_INITIALIZE = 54
/** ExternalBlockStore failed to initialize after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54

/** ExtBlkStore failed to create a local temporary directory after many attempts. */
val ExtBlk_STORE_FAILED_TO_CREATE_DIR = 55
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55

def explainExitCode(exitCode: Int): String = {
exitCode match {
Expand All @@ -46,11 +46,11 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
// TODO: replace external block store with concreate implementation desc
case ExtBlk_STORE_FAILED_TO_INITIALIZE => "External Block Store failed to initialize."
// TODO: replace external block store with concreate implementation desc
case ExtBlk_STORE_FAILED_TO_CREATE_DIR =>
"External Block Store failed to create a local temporary directory."
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](

val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
" CachedPartitions: %d; MemorySize: %s; ExtBlkStoreSize: %s; DiskSize: %s".format(
" CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize),
bytesToString(info.extBlkStoreSize), bytesToString(info.diskSize)))
bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))

s"$rdd [$persistence]" +: storageInfo
}
Expand Down
55 changes: 29 additions & 26 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

// Actual storage of where blocks are kept
private var extBlkStoreInitialized = false
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val extBlkStore: ExtBlockStore = new ExtBlockStore(this, executorId)
private[spark] lazy val externalBlockStore: ExternalBlockStore =
new ExternalBlockStore(this, executorId)

private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
Expand Down Expand Up @@ -311,13 +312,13 @@ private[spark] class BlockManager(

/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in Tachyon
// Assume that block is not in external block store
BlockStatus(info.level, memSize, diskSize, 0L)
}
}
Expand Down Expand Up @@ -367,10 +368,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inExtBlkStoreSize = status.extBlkStoreSize
val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExtBlkStoreSize)
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
} else {
true
}
Expand All @@ -388,15 +389,17 @@ private[spark] class BlockManager(
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inExtBlkStore = level.useOffHeap && extBlkStore.contains(blockId)
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inExtBlkStore || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, inExtBlkStore, deserialized, replication)
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
val storageLevel =
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val extBlkStoreSize = if (inExtBlkStore) extBlkStore.getSize(blockId) else 0L
val externalBlockStoreSize =
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize, extBlkStoreSize)
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
}
}
}
Expand Down Expand Up @@ -476,11 +479,11 @@ private[spark] class BlockManager(
}
}

// Look for the block in external blk store
// Look for the block in external block store
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExtBlkStore")
if (extBlkStore.contains(blockId)) {
extBlkStore.getBytes(blockId) match {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
Expand All @@ -489,7 +492,7 @@ private[spark] class BlockManager(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in extBlkStore")
logDebug(s"Block $blockId not found in externalBlockStore")
}
}
}
Expand Down Expand Up @@ -757,8 +760,8 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external blk storage
(false, extBlkStore)
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
Expand Down Expand Up @@ -793,7 +796,7 @@ private[spark] class BlockManager(

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, extBlkStore, or disk store,
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
Expand Down Expand Up @@ -1090,11 +1093,11 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromExtBlkStore =
if (extBlkStoreInitialized) extBlkStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExtBlkStore) {
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external blk store")
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
Expand Down Expand Up @@ -1128,7 +1131,7 @@ private[spark] class BlockManager(
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { extBlkStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
Expand Down Expand Up @@ -1208,8 +1211,8 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if (extBlkStoreInitialized) {
extBlkStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class BlockManagerMaster(
diskSize: Long,
externalBlockStoreSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, externalBlockStoreSize))
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
memSize, diskSize, externalBlockStoreSize))
logDebug(s"Updated info of block $blockId")
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
context.reply(true)

case UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, extBlkStoreSize) =>
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, extBlkStoreSize))
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))

case GetLocations(blockId) =>
context.reply(getLocations(blockId))
Expand Down Expand Up @@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
extBlkStoreSize: Long): Boolean = {
externalBlockStoreSize: Long): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
Expand All @@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
}

blockManagerInfo(blockManagerId).updateBlockInfo(
blockId, storageLevel, memSize, diskSize, extBlkStoreSize)
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)

var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
Expand Down Expand Up @@ -396,8 +396,8 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
extBlkStoreSize: Long) {
def isCached: Boolean = memSize + diskSize + extBlkStoreSize > 0
externalBlockStoreSize: Long) {
def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
}

@DeveloperApi
Expand Down Expand Up @@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
extBlkStoreSize: Long) {
externalBlockStoreSize: Long) {

updateLastSeenMs()

Expand All @@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
}

if (storageLevel.isValid) {
/* isValid means it is either stored in-memory, on-disk or on-extBlkStore.
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
* The memSize here indicates the data size in or dropped from memory,
* extBlkStoreSize here indicates the data size in or dropped from extBlkStore,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
Expand All @@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, extBlkStoreSize))
logInfo("Added %s on ExtBlkStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(extBlkStoreSize)))
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
Expand All @@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useOffHeap) {
logInfo("Removed %s on %s on extBlkStore (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.extBlkStoreSize)))
logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
blockId, blockManagerId.hostPort,
Utils.bytesToString(blockStatus.externalBlockStoreSize)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long,
var extBlkStoreSize: Long)
var externalBlockStoreSize: Long)
extends ToBlockManagerMaster
with Externalizable {

Expand All @@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
out.writeLong(extBlkStoreSize)
out.writeLong(externalBlockStoreSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
extBlkStoreSize = in.readLong()
externalBlockStoreSize = in.readLong()
}
}

Expand Down
Loading

0 comments on commit a54132c

Please sign in to comment.