Skip to content

Commit

Permalink
[SPARK-6479] [BLOCK MANAGER] Create off-heap block storage API
Browse files Browse the repository at this point in the history
This is the classes for creating off-heap block storage API. It also includes the migration for Tachyon. The diff seems to be big, but it mainly just rename tachyon to offheap. New implementation for hdfs will be submit for review in spark-6112.

Author: Zhan Zhang <[email protected]>

Closes apache#5430 from zhzhan/SPARK-6479 and squashes the following commits:

60acd84 [Zhan Zhang] minor change to kickoff the test
12f54c9 [Zhan Zhang] solve merge conflicts
a54132c [Zhan Zhang] solve review comments
ffb8e00 [Zhan Zhang] rebase to sparkcontext change
6e121e0 [Zhan Zhang] resolve review comments and restructure blockmanasger code
a7aed6c [Zhan Zhang] add Tachyon migration code
186de31 [Zhan Zhang] initial commit for off-heap block storage api
  • Loading branch information
zhzhan authored and jeanlyn committed Jun 12, 2015
1 parent 7e7bbf0 commit 80d3286
Show file tree
Hide file tree
Showing 22 changed files with 536 additions and 331 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

// Generate the random name for a temp folder in Tachyon
// Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
val tachyonFolderName = externalBlockStoreFolderName

def isLocal: Boolean = (master == "local" || master.startsWith("local["))

Expand Down Expand Up @@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

_conf.set("spark.tachyonStore.folderName", tachyonFolderName)
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** TachyonStore failed to initialize after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
/** ExternalBlockStore failed to initialize after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54

/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55

def explainExitCode(exitCode: Int): String = {
exitCode match {
Expand All @@ -46,9 +46,11 @@ object ExecutorExitCode {
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
"TachyonStore failed to create a local temporary directory."
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
// TODO: replace external block store with concrete implementation name
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
"ExternalBlockStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](

val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
" CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
info.numCachedPartitions, bytesToString(info.memSize),
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))

s"$rdd [$persistence]" +: storageInfo
}
Expand Down
63 changes: 29 additions & 34 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,11 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

// Actual storage of where blocks are kept
private var tachyonInitialized = false
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
val tachyonBlockManager =
new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}
private[spark] lazy val externalBlockStore: ExternalBlockStore =
new ExternalBlockStore(this, executorId)

private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
Expand Down Expand Up @@ -320,13 +312,13 @@ private[spark] class BlockManager(

/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in Tachyon
// Assume that block is not in external block store
BlockStatus(info.level, memSize, diskSize, 0L)
}
}
Expand Down Expand Up @@ -376,10 +368,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inTachyonSize = status.tachyonSize
val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
} else {
true
}
Expand All @@ -397,15 +389,17 @@ private[spark] class BlockManager(
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
val storageLevel =
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val externalBlockStoreSize =
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
}
}
}
Expand Down Expand Up @@ -485,11 +479,11 @@ private[spark] class BlockManager(
}
}

// Look for the block in Tachyon
// Look for the block in external block store
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
Expand All @@ -498,7 +492,7 @@ private[spark] class BlockManager(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
logDebug(s"Block $blockId not found in externalBlockStore")
}
}
}
Expand Down Expand Up @@ -766,8 +760,8 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use tachyon for off-heap storage
(false, tachyonStore)
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
Expand Down Expand Up @@ -802,7 +796,7 @@ private[spark] class BlockManager(

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, tachyon, or disk store,
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
Expand Down Expand Up @@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or tachyon store")
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
Expand Down Expand Up @@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { tachyonStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
Expand Down Expand Up @@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if (tachyonInitialized) {
tachyonStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class BlockManagerMaster(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {
externalBlockStoreSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
memSize, diskSize, externalBlockStoreSize))
logDebug(s"Updated info of block $blockId")
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
context.reply(true)

case UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))

case GetLocations(blockId) =>
context.reply(getLocations(blockId))
Expand Down Expand Up @@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {
externalBlockStoreSize: Long): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
Expand All @@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
}

blockManagerInfo(blockManagerId).updateBlockInfo(
blockId, storageLevel, memSize, diskSize, tachyonSize)
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)

var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
Expand Down Expand Up @@ -396,8 +396,8 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long) {
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
externalBlockStoreSize: Long) {
def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
}

@DeveloperApi
Expand Down Expand Up @@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long) {
externalBlockStoreSize: Long) {

updateLastSeenMs()

Expand All @@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
}

if (storageLevel.isValid) {
/* isValid means it is either stored in-memory, on-disk or on-Tachyon.
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
* The memSize here indicates the data size in or dropped from memory,
* tachyonSize here indicates the data size in or dropped from Tachyon,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
Expand All @@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
logInfo("Added %s on tachyon on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
Expand All @@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useOffHeap) {
logInfo("Removed %s on %s on tachyon (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
blockId, blockManagerId.hostPort,
Utils.bytesToString(blockStatus.externalBlockStoreSize)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long,
var tachyonSize: Long)
var externalBlockStoreSize: Long)
extends ToBlockManagerMaster
with Externalizable {

Expand All @@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
out.writeLong(tachyonSize)
out.writeLong(externalBlockStoreSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
tachyonSize = in.readLong()
externalBlockStoreSize = in.readLong()
}
}

Expand Down
Loading

0 comments on commit 80d3286

Please sign in to comment.