Skip to content

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Apr 28, 2014
1 parent d8b6620 commit e471d8e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ private[spark] class EventLoggingListener(
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort: Short)

private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
Expand All @@ -67,11 +65,10 @@ private[spark] class EventLoggingListener(
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS))
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION,
Some(LOG_FILE_PERMISSIONS))
logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS))
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
logger.newFile(LOG_PREFIX + logger.fileIndex)
}

/** Log the event as JSON. */
Expand Down Expand Up @@ -118,7 +115,7 @@ private[spark] class EventLoggingListener(
* In addition, create an empty special file to indicate application completion.
*/
def stop() = {
logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS))
logger.newFile(APPLICATION_COMPLETE)
logger.stop()
}
}
Expand All @@ -128,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging {
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)


// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ private[spark] class FileLogger(
}
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort()) {
fileSystem.setPermission(path, dirPermissions.get);
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
fileSystem.setPermission(path, dirPermissions.get)
}
}
}

/**
* Create a new writer for the file identified by the given path.
* If the permissions are not passed in, it will default to use the permissions
* (dirpermissions) used when class was instantiated.
*/
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName
Expand All @@ -110,7 +112,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

perms.foreach {p => fileSystem.setPermission(path, p)}
perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
Expand Down

0 comments on commit e471d8e

Please sign in to comment.