Skip to content

Commit

Permalink
initial commit for off-heap block storage api
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed May 1, 2015
1 parent 473552f commit 186de31
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 0 deletions.
110 changes: 110 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.nio.ByteBuffer
import org.apache.spark.Logging

import scala.util.control.NonFatal


trait OffHeapBlockManager {

/**
* desc for the implementation.
*
*/
def desc(): String = {"OffHeap"}

/**
* initialize a concrete block manager implementation.
*
* @throws java.io.IOException when FS init failure.
*/
def init(blockManager: BlockManager, executorId: String)

/**
* remove the cache from offheap
*
* @throws java.io.IOException when FS failure in removing file.
*/
def removeFile(blockId: BlockId): Boolean

/**
* check the existence of the block cache
*
* @throws java.io.IOException when FS failure in checking the block existence.
*/
def fileExists(blockId: BlockId): Boolean

/**
* save the cache to the offheap.
*
* @throws java.io.IOException when FS failure in put blocks.
*/
def putBytes(blockId: BlockId, bytes: ByteBuffer)

/**
* retrieve the cache from offheap
*
* @throws java.io.IOException when FS failure in get blocks.
*/
def getBytes(blockId: BlockId): Option[ByteBuffer]

/**
* retrieve the size of the cache
*
* @throws java.io.IOException when FS failure in get block size.
*/
def getSize(blockId: BlockId): Long

/**
* cleanup when shutdown
*
*/
def addShutdownHook()

final def setup(blockManager: BlockManager, executorId: String): Unit = {
init(blockManager, executorId)
addShutdownHook()
}
}

object OffHeapBlockManager extends Logging{
val MAX_DIR_CREATION_ATTEMPTS = 10
val subDirsPerDir = 64
def create(blockManager: BlockManager,
executorId: String): Option[OffHeapBlockManager] = {
val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager")
sNames match {
case Some(name) =>
try {
val instance = Class.forName(name)
.newInstance()
.asInstanceOf[OffHeapBlockManager]
instance.setup(blockManager, executorId)
Some(instance)
} catch {
case NonFatal(t) =>
logError("Cannot initialize offHeap store")
None
}
case None => None
}
}
}
142 changes: 142 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/OffHeapStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.nio.ByteBuffer
import org.apache.spark.Logging
import org.apache.spark.util.Utils

import scala.util.control.NonFatal


/**
* Stores BlockManager blocks on OffHeap.
* We capture any potential exception from underlying implementation
* and return with the expected failure value
*/
private[spark] class OffHeapStore(blockManager: BlockManager, executorId: String)
extends BlockStore(blockManager: BlockManager) with Logging {

lazy val offHeapManager: Option[OffHeapBlockManager] =
OffHeapBlockManager.create(blockManager, executorId)

logInfo("OffHeap started")

override def getSize(blockId: BlockId): Long = {
try {
offHeapManager.map(_.getSize(blockId)).getOrElse(0)
} catch {
case NonFatal(t) => logError(s"error in getSize from $blockId")
0
}
}

override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = {
putIntoOffHeapStore(blockId, bytes, returnValues = true)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
}

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
logDebug(s"Attempting to write values for block $blockId")
val bytes = blockManager.dataSerialize(blockId, values)
putIntoOffHeapStore(blockId, bytes, returnValues)
}

private def putIntoOffHeapStore(
blockId: BlockId,
bytes: ByteBuffer,
returnValues: Boolean): PutResult = {

// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
logDebug(s"Attempting to put block $blockId into OffHeap store")
val startTime = System.currentTimeMillis
// we should never hit here if offHeapManager is None. Handle it anyway for safety.
try {
if (offHeapManager.isDefined) {
offHeapManager.get.putBytes(blockId, bytes)
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in OffHeap store in %d ms".format(
blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))

if (returnValues) {
PutResult(bytes.limit(), Right(bytes.duplicate()))
} else {
PutResult(bytes.limit(), null)
}
} else {
logError(s"error in putBytes $blockId")
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) => logError(s"error in putBytes $blockId")
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
}
}

// We assume the block is removed even if exception thrown
override def remove(blockId: BlockId): Boolean = {
try {
offHeapManager.map(_.removeFile(blockId)).getOrElse(true)
} catch {
case NonFatal(t) => logError(s"error in removing $blockId")
true
}
}

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}


override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
try {
offHeapManager.flatMap(_.getBytes(blockId))
} catch {
case NonFatal(t) =>logError(s"error in getBytes from $blockId")
None
}
}

override def contains(blockId: BlockId): Boolean = {
try {
val ret = offHeapManager.map(_.fileExists(blockId)).getOrElse(false)
if (!ret) {
logInfo(s"remove block $blockId")
blockManager.removeBlock(blockId, true)
}
ret
} catch {
case NonFatal(t) => logError(s"error in getBytes from $blockId")
false
}
}
}

0 comments on commit 186de31

Please sign in to comment.