Skip to content

Commit

Permalink
[SPARK-4896] don’t redundantly overwrite executor JAR deps
Browse files Browse the repository at this point in the history
Author: Ryan Williams <[email protected]>

Closes apache#2848 from ryan-williams/fetch-file and squashes the following commits:

c14daff [Ryan Williams] Fix copy that was changed to a move inadvertently
8e39c16 [Ryan Williams] code review feedback
788ed41 [Ryan Williams] don’t redundantly overwrite executor JAR deps
  • Loading branch information
ryan-williams authored and JoshRosen committed Dec 19, 2014
1 parent cdb2c64 commit 7981f96
Showing 1 changed file with 107 additions and 63 deletions.
170 changes: 107 additions & 63 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
} finally {
lock.release()
}
if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
if (conf.getBoolean("spark.files.overwrite", false)) {
targetFile.delete()
logInfo((s"File $targetFile exists and does not match contents of $url, " +
s"replacing it with $url"))
} else {
throw new SparkException(s"File $targetFile exists and does not match contents of $url")
}
}
Files.copy(cachedFile, targetFile)
copyFile(
url,
cachedFile,
targetFile,
conf.getBoolean("spark.files.overwrite", false)
)
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}
Expand All @@ -411,6 +407,104 @@ private[spark] object Utils extends Logging {
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}

/**
* Download `in` to `tempFile`, then move it to `destFile`.
*
* If `destFile` already exists:
* - no-op if its contents equal those of `sourceFile`,
* - throw an exception if `fileOverwrite` is false,
* - attempt to overwrite it otherwise.
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param in InputStream to download.
* @param tempFile File path to download `in` to.
* @param destFile File path to move `tempFile` to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
*/
private def downloadFile(
url: String,
in: InputStream,
tempFile: File,
destFile: File,
fileOverwrite: Boolean): Unit = {

try {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
} finally {
// Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
// `destFile`.
if (tempFile.exists()) {
tempFile.delete()
}
}
}

/**
* Copy `sourceFile` to `destFile`.
*
* If `destFile` already exists:
* - no-op if its contents equal those of `sourceFile`,
* - throw an exception if `fileOverwrite` is false,
* - attempt to overwrite it otherwise.
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param sourceFile File path to copy/move from.
* @param destFile File path to copy/move to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
* @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
* `destFile`.
*/
private def copyFile(
url: String,
sourceFile: File,
destFile: File,
fileOverwrite: Boolean,
removeSourceFile: Boolean = false): Unit = {

if (destFile.exists) {
if (!Files.equal(sourceFile, destFile)) {
if (fileOverwrite) {
logInfo(
s"File $destFile exists and does not match contents of $url, replacing it with $url"
)
if (!destFile.delete()) {
throw new SparkException(
"Failed to delete %s while attempting to overwrite it with %s".format(
destFile.getAbsolutePath,
sourceFile.getAbsolutePath
)
)
}
} else {
throw new SparkException(
s"File $destFile exists and does not match contents of $url")
}
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(
"%s has been previously copied to %s".format(
sourceFile.getAbsolutePath,
destFile.getAbsolutePath
)
)
return
}
}

// The file does not exist in the target directory. Copy or move it there.
if (removeSourceFile) {
Files.move(sourceFile, destFile)
} else {
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
Files.copy(sourceFile, destFile)
}
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
Expand Down Expand Up @@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
var shouldCopy = true
if (targetFile.exists) {
if (!Files.equal(sourceFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
shouldCopy = false
}
}

if (shouldCopy) {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
}
}

Expand Down

0 comments on commit 7981f96

Please sign in to comment.