diff --git a/core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala new file mode 100644 index 0000000000000..ca92fab8a6567 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer +import org.apache.spark.Logging + +import scala.util.control.NonFatal + + +trait OffHeapBlockManager { + + /** + * desc for the implementation. + * + */ + def desc(): String = {"OffHeap"} + + /** + * initialize a concrete block manager implementation. + * + * @throws java.io.IOException when FS init failure. + */ + def init(blockManager: BlockManager, executorId: String) + + /** + * remove the cache from offheap + * + * @throws java.io.IOException when FS failure in removing file. + */ + def removeFile(blockId: BlockId): Boolean + + /** + * check the existence of the block cache + * + * @throws java.io.IOException when FS failure in checking the block existence. + */ + def fileExists(blockId: BlockId): Boolean + + /** + * save the cache to the offheap. + * + * @throws java.io.IOException when FS failure in put blocks. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer) + + /** + * retrieve the cache from offheap + * + * @throws java.io.IOException when FS failure in get blocks. + */ + def getBytes(blockId: BlockId): Option[ByteBuffer] + + /** + * retrieve the size of the cache + * + * @throws java.io.IOException when FS failure in get block size. + */ + def getSize(blockId: BlockId): Long + + /** + * cleanup when shutdown + * + */ + def addShutdownHook() + + final def setup(blockManager: BlockManager, executorId: String): Unit = { + init(blockManager, executorId) + addShutdownHook() + } +} + +object OffHeapBlockManager extends Logging{ + val MAX_DIR_CREATION_ATTEMPTS = 10 + val subDirsPerDir = 64 + def create(blockManager: BlockManager, + executorId: String): Option[OffHeapBlockManager] = { + val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager") + sNames match { + case Some(name) => + try { + val instance = Class.forName(name) + .newInstance() + .asInstanceOf[OffHeapBlockManager] + instance.setup(blockManager, executorId) + Some(instance) + } catch { + case NonFatal(t) => + logError("Cannot initialize offHeap store") + None + } + case None => None + } + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/OffHeapStore.scala b/core/src/main/scala/org/apache/spark/storage/OffHeapStore.scala new file mode 100644 index 0000000000000..acc77e4ac3500 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/OffHeapStore.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +import scala.util.control.NonFatal + + +/** + * Stores BlockManager blocks on OffHeap. + * We capture any potential exception from underlying implementation + * and return with the expected failure value + */ +private[spark] class OffHeapStore(blockManager: BlockManager, executorId: String) + extends BlockStore(blockManager: BlockManager) with Logging { + + lazy val offHeapManager: Option[OffHeapBlockManager] = + OffHeapBlockManager.create(blockManager, executorId) + + logInfo("OffHeap started") + + override def getSize(blockId: BlockId): Long = { + try { + offHeapManager.map(_.getSize(blockId)).getOrElse(0) + } catch { + case NonFatal(t) => logError(s"error in getSize from $blockId") + 0 + } + } + + override def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult = { + putIntoOffHeapStore(blockId, bytes, returnValues = true) + } + + override def putArray( + blockId: BlockId, + values: Array[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + putIterator(blockId, values.toIterator, level, returnValues) + } + + override def putIterator( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult = { + logDebug(s"Attempting to write values for block $blockId") + val bytes = blockManager.dataSerialize(blockId, values) + putIntoOffHeapStore(blockId, bytes, returnValues) + } + + private def putIntoOffHeapStore( + blockId: BlockId, + bytes: ByteBuffer, + returnValues: Boolean): PutResult = { + + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val byteBuffer = bytes.duplicate() + byteBuffer.rewind() + logDebug(s"Attempting to put block $blockId into OffHeap store") + val startTime = System.currentTimeMillis + // we should never hit here if offHeapManager is None. Handle it anyway for safety. + try { + if (offHeapManager.isDefined) { + offHeapManager.get.putBytes(blockId, bytes) + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file in OffHeap store in %d ms".format( + blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime)) + + if (returnValues) { + PutResult(bytes.limit(), Right(bytes.duplicate())) + } else { + PutResult(bytes.limit(), null) + } + } else { + logError(s"error in putBytes $blockId") + PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty))) + } + } catch { + case NonFatal(t) => logError(s"error in putBytes $blockId") + PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty))) + } + } + + // We assume the block is removed even if exception thrown + override def remove(blockId: BlockId): Boolean = { + try { + offHeapManager.map(_.removeFile(blockId)).getOrElse(true) + } catch { + case NonFatal(t) => logError(s"error in removing $blockId") + true + } + } + + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) + } + + + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { + try { + offHeapManager.flatMap(_.getBytes(blockId)) + } catch { + case NonFatal(t) =>logError(s"error in getBytes from $blockId") + None + } + } + + override def contains(blockId: BlockId): Boolean = { + try { + val ret = offHeapManager.map(_.fileExists(blockId)).getOrElse(false) + if (!ret) { + logInfo(s"remove block $blockId") + blockManager.removeBlock(blockId, true) + } + ret + } catch { + case NonFatal(t) => logError(s"error in getBytes from $blockId") + false + } + } +}