Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
Browse files Browse the repository at this point in the history
…cture-improvement
  • Loading branch information
sarutak committed Sep 4, 2014
2 parents 15f88a3 + a522407 commit fa7175b
Show file tree
Hide file tree
Showing 54 changed files with 1,143 additions and 456 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,10 @@ class SparkContext(config: SparkConf) extends Logging {

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
startTime, sparkUser))
}

/** Post the application end event */
Expand Down Expand Up @@ -1297,7 +1300,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.0.0"
private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]
def getListing(): Iterable[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): SparkUI
def getAppUI(appId: String): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

private val NOT_STARTED = "<Not Started>"

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
Expand All @@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

/**
* A background thread that periodically checks for event log updates on disk.
Expand Down Expand Up @@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}

override def getListing() = appList
override def getListing() = applications.values

override def getAppUI(appId: String): SparkUI = {
override def getAppUI(appId: String): Option[SparkUI] = {
try {
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
applications.get(appId).map { info =>
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
new Path(logDir, info.logDir)))
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}

replayBus.replay()

ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
appListener.viewAcls.getOrElse(""))
ui
}
} catch {
case e: FileNotFoundException => null
case e: FileNotFoundException => None
}
}

Expand All @@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => app.id -> app):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
// Load all new logs from the log directory. Only directories that have a modification time
// later than the last known log directory will be loaded.
var newLastModifiedTime = lastModifiedTime
val logInfos = logDirs
.filter { dir =>
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
}
}
.flatMap { dir =>
try {
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
None
}
}
.sortBy { info => -info.endTime }

lastModifiedTime = newLastModifiedTime

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id)) {
newApps += (info.id -> info)
}
} else {
newApps += curr
}
}

appList = newApps.sortBy { info => -info.endTime }
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = newApps
}
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*
* When creating the listing of available apps, there is no need to load the whole UI for the
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
* clicks on a specific application.
*
* @param logDir Directory with application's log files.
* @param renderUI Whether to create the SparkUI for the application.
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val path = logDir.getPath
val appId = path.getName
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appInfo = ApplicationHistoryInfo(
appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
getModificationTime(logDir),
appListener.sparkUser)

if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
(replayBus, appListener)
}

/** Return when this directory was last modified. */
Expand All @@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)

}

private class FsApplicationHistoryInfo(
val logDir: String,
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ class HistoryServer(

private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
val ui = provider.getAppUI(key)
if (ui == null) {
throw new NoSuchElementException()
}
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var sparkUser = "<Not Started>"
var startTime = -1L
var endTime = -1L
var viewAcls = ""
var adminAcls = ""

def applicationStarted = startTime != -1

def applicationCompleted = endTime != -1

def applicationDuration: Long = {
val difference = endTime - startTime
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
}
var appName: Option[String] = None
var appId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
var viewAcls: Option[String] = None
var adminAcls: Option[String] = None

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
sparkUser = applicationStart.sparkUser
appName = Some(applicationStart.appName)
appId = applicationStart.appId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
endTime = applicationEnd.time
endTime = Some(applicationEnd.time)
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
viewAcls = allProperties.get("spark.ui.view.acls")
adminAcls = allProperties.get("spark.admin.acls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true

/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent

@DeveloperApi
Expand All @@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
sparkUser: String) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean

/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}

override def applicationId(): Option[String] = backend.applicationId()

}


Expand Down Expand Up @@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {

retval.toList
}

}
Loading

0 comments on commit fa7175b

Please sign in to comment.