Skip to content

Commit

Permalink
Modified SparkContext and EventLoggingListener so that the directory …
Browse files Browse the repository at this point in the history
…for EventLogs is named same for Application ID
  • Loading branch information
sarutak committed Sep 25, 2014
1 parent 203634e commit 28d4d93
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
26 changes: 13 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
Expand Down Expand Up @@ -318,6 +305,19 @@ class SparkContext(config: SparkConf) extends Logging {
metricsSystem.registerSinks()
metricsSystem.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appId, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appName: String,
appId: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) =
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
def this(appId: String, sparkConf: SparkConf) =
this(appId, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
private val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

Expand Down

0 comments on commit 28d4d93

Please sign in to comment.