diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 042b2a563aac4..d822a8e55111a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -52,8 +52,6 @@ private[spark] class EventLoggingListener( private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis val logDir = logBaseDir + "/" + name - val LOG_FILE_PERMISSIONS: FsPermission = - FsPermission.createImmutable(Integer.parseInt("770", 8).toShort: Short) private val logger = new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, @@ -67,11 +65,10 @@ private[spark] class EventLoggingListener( logInfo("Logging events to %s".format(logDir)) if (shouldCompress) { val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) - logger.newFile(COMPRESSION_CODEC_PREFIX + codec, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION, - Some(LOG_FILE_PERMISSIONS)) - logger.newFile(LOG_PREFIX + logger.fileIndex, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) + logger.newFile(LOG_PREFIX + logger.fileIndex) } /** Log the event as JSON. */ @@ -118,7 +115,7 @@ private[spark] class EventLoggingListener( * In addition, create an empty special file to indicate application completion. */ def stop() = { - logger.newFile(APPLICATION_COMPLETE, Some(LOG_FILE_PERMISSIONS)) + logger.newFile(APPLICATION_COMPLETE) logger.stop() } } @@ -128,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging { val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + val LOG_FILE_PERMISSIONS: FsPermission = + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) + // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index f33017e298d87..581c03f1332d3 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -83,14 +83,16 @@ private[spark] class FileLogger( } if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort()) { - fileSystem.setPermission(path, dirPermissions.get); + if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) { + fileSystem.setPermission(path, dirPermissions.get) } } } /** * Create a new writer for the file identified by the given path. + * If the permissions are not passed in, it will default to use the permissions + * (dirpermissions) used when class was instantiated. */ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { val logPath = logDir + "/" + fileName @@ -110,7 +112,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - perms.foreach {p => fileSystem.setPermission(path, p)} + perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)} val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream)