Skip to content

Commit

Permalink
[SPARK-6144] [core] Fix addFile when source files are on "hdfs:"
Browse files Browse the repository at this point in the history
The code failed in two modes: it complained when it tried to re-create a directory that already existed, and it was placing some files in the wrong parent directory. The patch fixes both issues.

Author: Marcelo Vanzin <[email protected]>
Author: trystanleftwich <[email protected]>

Closes apache#4894 from vanzin/SPARK-6144 and squashes the following commits:

100b3a1 [Marcelo Vanzin] Style fix.
58266aa [Marcelo Vanzin] Fix fetchHcfs file for directories.
91733b7 [trystanleftwich] [SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced jar will fail
  • Loading branch information
Marcelo Vanzin authored and Andrew Or committed Mar 4, 2015
1 parent f6773ed commit 3a35a0d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 50 deletions.
28 changes: 16 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
filename = Some(filename))
}
}

Expand All @@ -639,19 +640,22 @@ private[spark] object Utils extends Logging {
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
fileOverwrite: Boolean,
filename: Option[String] = None): Unit = {
if (!targetDir.exists() && !targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
val dest = new File(targetDir, filename.getOrElse(path.getName))
if (fs.isFile(path)) {
val in = fs.open(path)
try {
downloadFile(path.toString, in, dest, fileOverwrite)
} finally {
in.close()
}
} else {
fs.listStatus(path).foreach { fileStatus =>
fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, fileOverwrite)
}
}
}
Expand Down
85 changes: 47 additions & 38 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))

// although child1 is old, child2 is still new so return true
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child2.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child3.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}

test("resolveURI") {
Expand Down Expand Up @@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(!tempDir1.exists())

val tempDir2 = Utils.createTempDir()
val tempFile1 = new File(tempDir2, "foo.txt")
Files.touch(tempFile1)
assert(tempFile1.exists())
Utils.deleteRecursively(tempFile1)
assert(!tempFile1.exists())
val sourceFile1 = new File(tempDir2, "foo.txt")
Files.touch(sourceFile1)
assert(sourceFile1.exists())
Utils.deleteRecursively(sourceFile1)
assert(!sourceFile1.exists())

val tempDir3 = new File(tempDir2, "subdir")
assert(tempDir3.mkdir())
val tempFile2 = new File(tempDir3, "bar.txt")
Files.touch(tempFile2)
assert(tempFile2.exists())
val sourceFile2 = new File(tempDir3, "bar.txt")
Files.touch(sourceFile2)
assert(sourceFile2.exists())
Utils.deleteRecursively(tempDir2)
assert(!tempDir2.exists())
assert(!tempDir3.exists())
assert(!tempFile2.exists())
assert(!sourceFile2.exists())
}

test("loading properties from file") {
Expand Down Expand Up @@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}

test("fetch hcfs dir") {
val tempDir = Utils.createTempDir()
val innerTempDir = Utils.createTempDir(tempDir.getPath)
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
val targetDir = new File("target-dir")
Files.write("some text", tempFile, UTF_8)

try {
val path = new Path("file://" + tempDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.exists())
assert(targetDir.isDirectory())
val newInnerDir = new File(targetDir, innerTempDir.getName)
println("inner temp dir: " + innerTempDir.getName)
targetDir.listFiles().map(_.getName).foreach(println)
assert(newInnerDir.exists())
assert(newInnerDir.isDirectory())
val newInnerFile = new File(newInnerDir, tempFile.getName)
assert(newInnerFile.exists())
assert(newInnerFile.isFile())
} finally {
Utils.deleteRecursively(tempDir)
Utils.deleteRecursively(targetDir)
}
val sourceDir = Utils.createTempDir()
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
val targetDir = new File(Utils.createTempDir(), "target-dir")
Files.write("some text", sourceFile, UTF_8)

val path = new Path("file://" + sourceDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)

assert(!targetDir.isDirectory())
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.isDirectory())

// Copy again to make sure it doesn't error if the dir already exists.
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)

val destDir = new File(targetDir, sourceDir.getName())
assert(destDir.isDirectory())

val destInnerDir = new File(destDir, innerSourceDir.getName)
assert(destInnerDir.isDirectory())

val destInnerFile = new File(destInnerDir, sourceFile.getName)
assert(destInnerFile.isFile())

val filePath = new Path("file://" + sourceFile.getAbsolutePath)
val testFileDir = new File("test-filename")
val testFileName = "testFName"
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
conf, false, Some(testFileName))
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}
}

0 comments on commit 3a35a0d

Please sign in to comment.