Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron committed Dec 24, 2015
2 parents 825f3f3 + ea4aab7 commit 99b5bde
Show file tree
Hide file tree
Showing 56 changed files with 453 additions and 241 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2073,8 +2073,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// its own local file system, which is incorrect because the checkpoint files
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Checkpoint directory must be non-local " +
"if Spark is running on a cluster: " + directory)
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}

checkpointDir = Option(directory).map { dir =>
Expand Down
135 changes: 104 additions & 31 deletions core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import scala.util.control.NonFatal

import com.google.common.io.ByteStreams

import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
import tachyon.{Constants, TachyonURI}
import tachyon.client.ClientContext
import tachyon.client.file.{TachyonFile, TachyonFileSystem}
import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
import tachyon.client.file.options.DeleteOptions
import tachyon.conf.TachyonConf
import tachyon.TachyonURI
import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.Utils


/**
Expand All @@ -44,15 +48,15 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log

var rootDirs: String = _
var master: String = _
var client: tachyon.client.TachyonFS = _
var client: TachyonFileSystem = _
private var subDirsPerTachyonDir: Int = _

// Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
// then, inside this directory, create multiple subdirectories that we will hash files into,
// in order to avoid having really large inodes at the top level in Tachyon.
private var tachyonDirs: Array[TachyonFile] = _
private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _

private var subDirs: Array[Array[TachyonFile]] = _
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

override def init(blockManager: BlockManager, executorId: String): Unit = {
super.init(blockManager, executorId)
Expand All @@ -62,7 +66,10 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
rootDirs = s"$storeDir/$appFolderName/$executorId"
master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
client = if (master != null && master != "") {
TachyonFS.get(new TachyonURI(master), new TachyonConf())
val tachyonConf = new TachyonConf()
tachyonConf.set(Constants.MASTER_ADDRESS, master)
ClientContext.reset(tachyonConf)
TachyonFileSystemFactory.get
} else {
null
}
Expand All @@ -80,7 +87,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
// in order to avoid having really large inodes at the top level in Tachyon.
tachyonDirs = createTachyonDirs()
subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
tachyonDirs.foreach(registerShutdownDeleteDir)
}

override def toString: String = {"ExternalBlockStore-Tachyon"}
Expand All @@ -89,6 +96,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
val file = getFile(blockId)
if (fileExists(file)) {
removeFile(file)
true
} else {
false
}
Expand All @@ -101,7 +109,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log

override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
Utils.writeByteBuffer(bytes, os)
} catch {
Expand All @@ -115,7 +123,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log

override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val file = getFile(blockId)
val os = file.getOutStream(WriteType.TRY_CACHE)
val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
blockManager.dataSerializeStream(blockId, os, values)
} catch {
Expand All @@ -129,12 +137,17 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = getFile(blockId)
if (file == null || file.getLocationHosts.size == 0) {
if (file == null) {
return None
}
val is = file.getInStream(ReadType.CACHE)
val is = try {
client.getInStream(file)
} catch {
case _: FileDoesNotExistException =>
return None
}
try {
val size = file.length
val size = client.getInfo(file).length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
Expand All @@ -149,25 +162,37 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log

override def getValues(blockId: BlockId): Option[Iterator[_]] = {
val file = getFile(blockId)
if (file == null || file.getLocationHosts().size() == 0) {
if (file == null) {
return None
}
val is = file.getInStream(ReadType.CACHE)
Option(is).map { is =>
blockManager.dataDeserializeStream(blockId, is)
val is = try {
client.getInStream(file)
} catch {
case _: FileDoesNotExistException =>
return None
}
try {
Some(blockManager.dataDeserializeStream(blockId, is))
} finally {
is.close()
}
}

override def getSize(blockId: BlockId): Long = {
getFile(blockId.name).length
client.getInfo(getFile(blockId.name)).length
}

def removeFile(file: TachyonFile): Boolean = {
client.delete(new TachyonURI(file.getPath()), false)
def removeFile(file: TachyonFile): Unit = {
client.delete(file)
}

def fileExists(file: TachyonFile): Boolean = {
client.exist(new TachyonURI(file.getPath()))
try {
client.getInfo(file)
true
} catch {
case _: FileDoesNotExistException => false
}
}

def getFile(filename: String): TachyonFile = {
Expand All @@ -186,18 +211,18 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
} else {
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
val newDir = client.getFile(path)
val newDir = client.loadMetadata(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = new TachyonURI(s"$subDir/$filename")
if(!client.exist(filePath)) {
client.createFile(filePath)
try {
client.create(filePath)
} catch {
case _: FileAlreadyExistsException => client.loadMetadata(filePath)
}
val file = client.getFile(filePath)
file
}

def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
Expand All @@ -217,9 +242,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
if (!client.exist(path)) {
try {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
tachyonDir = client.loadMetadata(path)
} catch {
case _: FileAlreadyExistsException => // continue
}
} catch {
case NonFatal(e) =>
Expand All @@ -240,14 +267,60 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
deleteRecursively(tachyonDir, client)
}
} catch {
case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
client.close()
}

/**
* Delete a file or directory and its contents recursively.
*/
private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
}

// Register the tachyon path to be deleted via shutdown hook
private def registerShutdownDeleteDir(file: TachyonFile) {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths += absolutePath
}
}

// Remove the tachyon path to be deleted via shutdown hook
private def removeShutdownDeleteDir(file: TachyonFile) {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths -= absolutePath
}
}

// Is the path already registered to be deleted via a shutdown hook ?
private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
val absolutePath = client.getInfo(file).getPath
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.contains(absolutePath)
}
}

// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in Exception and incomplete cleanup.
private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
val absolutePath = client.getInfo(file).getPath
val hasRoot = shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.exists(
path => !absolutePath.equals(path) && absolutePath.startsWith(path))
}
if (hasRoot) {
logInfo(s"path = $absolutePath, already present as root for deletion.")
}
hasRoot
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File
import java.util.PriorityQueue

import scala.util.{Failure, Success, Try}
import tachyon.client.TachyonFile

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
Expand Down Expand Up @@ -52,7 +51,6 @@ private[spark] object ShutdownHookManager extends Logging {
}

private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
Expand All @@ -77,14 +75,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}

// Register the tachyon path to be deleted via shutdown hook
def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
val absolutePath = tachyonfile.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths += absolutePath
}
}

// Remove the path to be deleted via shutdown hook
def removeShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
Expand All @@ -93,14 +83,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}

// Remove the tachyon path to be deleted via shutdown hook
def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
val absolutePath = tachyonfile.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.remove(absolutePath)
}
}

// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteDir(file: File): Boolean = {
val absolutePath = file.getAbsolutePath()
Expand All @@ -109,14 +91,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}

// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
val absolutePath = file.getPath()
shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.contains(absolutePath)
}
}

// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in IOException and incomplete cleanup.
Expand All @@ -133,22 +107,6 @@ private[spark] object ShutdownHookManager extends Logging {
retval
}

// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in Exception and incomplete cleanup.
def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
val absolutePath = file.getPath()
val retval = shutdownDeleteTachyonPaths.synchronized {
shutdownDeleteTachyonPaths.exists { path =>
!absolutePath.equals(path) && absolutePath.startsWith(path)
}
}
if (retval) {
logInfo("path = " + file + ", already present as root for deletion.")
}
retval
}

/**
* Detect whether this thread might be executing a shutdown hook. Will always return true if
* the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
Expand Down
11 changes: 0 additions & 11 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
import org.slf4j.Logger
import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -946,15 +944,6 @@ private[spark] object Utils extends Logging {
}
}

/**
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}

/**
* Check to see if file is a symbolic link.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
private val conf: SparkConf = new SparkConf(loadDefaults = false)

override def beforeEach(): Unit = {
super.beforeEach()
tempDir = Utils.createTempDir()
MockitoAnnotations.initMocks(this)

Expand All @@ -55,7 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}

override def afterEach(): Unit = {
Utils.deleteRecursively(tempDir)
try {
Utils.deleteRecursively(tempDir)
} finally {
super.afterEach()
}
}

test("commit shuffle files multiple times") {
Expand Down
Loading

0 comments on commit 99b5bde

Please sign in to comment.