Skip to content

Commit

Permalink
[SPARK-3377] [SPARK-3610] Metrics can be accidentally aggregated / Hi…
Browse files Browse the repository at this point in the history
…story server log name should not be based on user input

This PR is another solution for apache#2250

I'm using codahale base MetricsSystem of Spark with JMX or Graphite, and I saw following 2 problems.

(1) When applications which have same spark.app.name run on cluster at the same time, some metrics names are mixed. For instance, if 2+ application is running on the cluster at the same time, each application emits the same named metric like "SparkPi.DAGScheduler.stage.failedStages" and Graphite cannot distinguish the metrics is for which application.

(2) When 2+ executors run on the same machine, JVM metrics of each executors are mixed. For instance, 2+ executors running on the same node can emit the same named metric "jvm.memory" and Graphite cannot distinguish the metrics is from which application.

And there is an similar issue. The directory for event logs is named using application name.
Application name is defined by user and the name can includes illegal character for path names.
Further more, the directory name consists of application name and System.currentTimeMillis even though each application has unique Application ID so if we run jobs which have same name, it's difficult to identify which directory is for which application.

Closes apache#2250
Closes apache#1067

Author: Kousuke Saruta <[email protected]>

Closes apache#2432 from sarutak/metrics-structure-improvement2 and squashes the following commits:

3288b2b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
39169e4 [Kousuke Saruta] Fixed style
6570494 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
817e4f0 [Kousuke Saruta] Simplified MetricsSystem#buildRegistryName
67fa5eb [Kousuke Saruta] Unified MetricsSystem#registerSources and registerSinks in start
10be654 [Kousuke Saruta] Fixed style.
990c078 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
f0c7fba [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
59cc2cd [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite
f9b6fb3 [Kousuke Saruta] Modified style.
2cf8a0f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
389090d [Kousuke Saruta] Replaced taskScheduler.applicationId() with getApplicationId in SparkContext#postApplicationStart
ff45c89 [Kousuke Saruta] Added some test cases to MetricsSystemSuite
69c46a6 [Kousuke Saruta] Added warning logging logic to MetricsSystem#buildRegistryName
5cca0d2 [Kousuke Saruta] Added Javadoc comment to SparkContext#getApplicationId
16a9f01 [Kousuke Saruta] Added data types to be returned to some methods
6434b06 [Kousuke Saruta] Reverted changes related to ApplicationId
0413b90 [Kousuke Saruta] Deleted ApplicationId.java and ApplicationIdSuite.java
a42300c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
0fc1b09 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
42bea55 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
248935d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
f6af132 [Kousuke Saruta] Modified SchedulerBackend and TaskScheduler to return System.currentTimeMillis as an unique Application Id
1b8b53e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
97cb85c [Kousuke Saruta] Modified confliction of MimExcludes
2cdd009 [Kousuke Saruta] Modified defailt implementation of applicationId
9aadb0b [Kousuke Saruta] Modified NetworkReceiverSuite to ensure "executor.start()" is finished in test "network receiver life cycle"
3011efc [Kousuke Saruta] Added ApplicationIdSuite.scala
d009c55 [Kousuke Saruta] Modified ApplicationId#equals to compare appIds
dfc83fd [Kousuke Saruta] Modified ApplicationId to implement Serializable
9ff4851 [Kousuke Saruta] Modified MimaExcludes.scala to ignore createTaskScheduler method in SparkContext
4567ffc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
6a91b14 [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite, ExecutorRunnerTest and EventLoggingListenerSuite
0325caf [Kousuke Saruta] Added ApplicationId.scala
0a2fc14 [Kousuke Saruta] Modified style
eabda80 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
0f890e6 [Kousuke Saruta] Modified SparkDeploySchedulerBackend and Master to pass baseLogDir instead f eventLogDir
bcf25bf [Kousuke Saruta] Modified directory name for EventLogs
28d4d93 [Kousuke Saruta] Modified SparkContext and EventLoggingListener so that the directory for EventLogs is named same for Application ID
203634e [Kousuke Saruta] Modified comment in SchedulerBackend#applicationId and TaskScheduler#applicationId
424fea4 [Kousuke Saruta] Modified  the subclasses of TaskScheduler and SchedulerBackend so that they can return non-optional Unique Application ID
b311806 [Kousuke Saruta] Swapped last 2 arguments passed to CoarseGrainedExecutorBackend
8a2b6ec [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
086ee25 [Kousuke Saruta] Merge branch 'metrics-structure-improvement2' of github.com:sarutak/spark into metrics-structure-improvement2
e705386 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
36d2f7a [Kousuke Saruta] Added warning message for the situation we cannot get application id for the prefix for the name of metrics
eea6e19 [Kousuke Saruta] Modified CoarseGrainedMesosSchedulerBackend and MesosSchedulerBackend so that we can get Application ID
c229fbe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
e719c39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
4a93c7f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
4776f9e [Kousuke Saruta] Modified MetricsSystemSuite.scala
efcb6e1 [Kousuke Saruta] Modified to add application id to metrics name
2ec848a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
3ea7896 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
ead8966 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
08e627e [Kousuke Saruta] Revert "tmp"
7b67f5a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
45bd33d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
93e263a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
848819c [Kousuke Saruta] Merge branch 'metrics-structure-improvement' of github.com:sarutak/spark into metrics-structure-improvement
912a637 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
e4a4593 [Kousuke Saruta] tmp
3e098d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
4603a39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
fa7175b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
15f88a3 [Kousuke Saruta] Modified MetricsSystem#buildRegistryName because conf.get does not return null when correspondin entry is absent
6f7dcd4 [Kousuke Saruta] Modified constructor of DAGSchedulerSource and BlockManagerSource because the instance of SparkContext is no longer used
6fc5560 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource
4e057c9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement
85ffc02 [Kousuke Saruta] Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource"
868e326 [Kousuke Saruta] Modified MetricsSystem to set registry name with unique application-id and driver/executor-id
71609f5 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource
55debab [Kousuke Saruta] Modified SparkContext and Executor to set spark.executor.id to identifiers
4180993 [Kousuke Saruta] Modified SparkContext to retain spark.unique.app.name property in SparkConf
  • Loading branch information
sarutak authored and andrewor14 committed Oct 3, 2014
1 parent 1eb8389 commit 79e45c9
Show file tree
Hide file tree
Showing 29 changed files with 331 additions and 85 deletions.
52 changes: 36 additions & 16 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
private[spark] val eventLogDir: Option[String] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
} else {
None
}
}

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
Expand All @@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
Expand Down Expand Up @@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
Expand Down Expand Up @@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
// constructor
taskScheduler.start()

val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand Down Expand Up @@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
Expand Down Expand Up @@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,15 @@ object SparkEnv extends Logging {
}

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
metricsSystem.start()

// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
SparkHadoopUtil}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
Expand Down Expand Up @@ -693,16 +693,18 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,

val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec

if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogDir."
var msg = s"No event logs found for application $appName in $appEventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String]) {

SignalLogger.register(log)
Expand All @@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

// Create a new ActorSystem using driver's Spark properties to run the backend.
Expand All @@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

def main(args: Array[String]) {
args.length match {
case x if x < 4 =>
case x if x < 5 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> [<workerUrl>]")
"<cores> <appid> [<workerUrl>] ")
System.exit(1)
case 4 =>
run(args(0), args(1), args(2), args(3).toInt, None)
case x if x > 4 =>
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
case x if x > 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)

// Initialize Spark environment (using system properties read above)
conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)

override val metricRegistry = new MetricRegistry()

// TODO: It would be nice to pass the application name here
override val sourceName = "executor.%s".format(executorId)
override val sourceName = "executor"

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
Expand Down
40 changes: 35 additions & 5 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())

metricsConfig.initialize()
registerSources()
registerSinks()

def start() {
registerSources()
registerSinks()
sinks.foreach(_.start)
}

Expand All @@ -98,19 +98,49 @@ private[spark] class MetricsSystem private (
sinks.foreach(_.report())
}

/**
* Build a name that uniquely identifies each metric source.
* The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
* If either ID is not available, this defaults to just using <source name>.
*
* @param source Metric source to be named by this method.
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)

if (instance == "driver" || instance == "executor") {
if (appId.isDefined && executorId.isDefined) {
MetricRegistry.name(appId.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor are set spark.app.id and spark.executor.id.
// For instance, Master and Worker are not related to a specific application.
val warningMsg = s"Using default name $defaultName for source because %s is not set."
if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
defaultName
}
} else { defaultName }
}

def registerSource(source: Source) {
sources += source
try {
registry.register(source.sourceName, source.metricRegistry)
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}

Expand All @@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "%s.DAGScheduler".format(sc.appName)
override val sourceName = "DAGScheduler"

metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,29 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
appName: String,
appId: String,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) =
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
.toLowerCase + "-" + System.currentTimeMillis
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
val logDirName: String = logDir.split("/").last
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

// For testing. Keep track of all JSON serialized events that have been logged.
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

/**
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
}

/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
Expand Down Expand Up @@ -184,6 +175,18 @@ private[spark] object EventLoggingListener extends Logging {
} else ""
}

/**
* Return a file-system-safe path to the log directory for the given application.
*
* @param logBaseDir A base directory for the path to the log directory for given application.
* @param appId A unique app ID.
* @return A path which consists of file-system-safe characters.
*/
def getLogDirPath(logBaseDir: String, appId: String): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
}

/**
* Parse the event logging information associated with the logs in the given directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package org.apache.spark.scheduler
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
Expand All @@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend {
def isReady(): Boolean = true

/**
* The application ID associated with the job, if any.
* Get an application ID associated with the job.
*
* @return The application ID, or None if the backend does not provide an ID.
* @return An application ID
*/
def applicationId(): Option[String] = None
def applicationId(): String = appId

}
Loading

0 comments on commit 79e45c9

Please sign in to comment.