-
- - Event Log Location: {parent.baseLogDir}
-
{
- if (parent.appIdToInfo.size > 0) {
+ if (apps.size > 0) {
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {apps.size}/{apps.size}
+ Completed Application{if (apps.size > 1) "s" else ""}
++
appTable
} else {
@@ -56,18 +53,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
+ val appName = if (info.started) info.name else info.id
+ val uiAddress = "/history/" + info.id
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
{appName} |
@@ -75,7 +70,6 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
{endTime} |
{duration} |
{sparkUser} |
- {logDirectory} |
{lastUpdated} |
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca5678e..c8e1dfa6fa220 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,14 +17,15 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,56 +39,76 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
+
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- import HistoryServer._
+ // set whether to enable or disable view acls for all applications
+ private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val info = provider.getAppInfo(key)
+ if (info != null) {
+ info.ui.getSecurityManager.setUIAcls(uiAclsEnabled)
+ info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls)
+ attachSparkUI(info.ui)
+ info.ui
+ } else {
+ throw new NoSuchElementException()
+ }
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = req.getPathInfo().split("/")
+ if (parts.length < 2) {
+ res.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ return
+ }
- @volatile private var stopped = false
+ var appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +119,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +151,17 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
-
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
-
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
+ /**
+ * Returns a list of available applications, in descending order according to their last
+ * updated time.
+ *
+ * @param offset Offset of the first entry to return.
+ * @param limit Maximum number of entries to return (-1 = no limit).
+ */
+ def getApplicationList(offset: Int, limit: Int) = {
+ provider.getListing(offset, limit)
}
+
}
/**
@@ -251,25 +177,21 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ parse(argStrings.toList)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
// Wait until the end of the world... or if the HistoryServer process is manually stopped
@@ -290,8 +212,85 @@ object HistoryServer {
}
}
+ private def parse(args: List[String]): Unit = {
+ args match {
+ case ("--dir" | "-d") :: value :: tail =>
+ set("fs.logDirectory", value)
+ parse(tail)
+
+ case ("--port" | "-p") :: value :: tail =>
+ set("ui.port", value)
+ parse(tail)
+
+ case ("-D") :: opt :: value :: tail =>
+ set(opt, value)
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil =>
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+ }
+
+ private def set(name: String, value: String) = {
+ conf.set("spark.history." + name, value)
+ }
+
+ private def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ """
+ |Usage: HistoryServer [options]
+ |
+ |Options are set by passing "-D option value" command line arguments to the class.
+ |Command line options will override the Spark configuration file and system properties.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | ui.port Port where server will listen for connections (default 18080)
+ | ui.acls.enable Whether to enable view acls for all applications (default false)
+ | provider Name of history provider class (defaults to file system-based provider)
+ |
+ |FsHistoryProvider options:
+ |
+ | fs.logDirectory Directory where app logs are stored (required)
+ | fs.updateInterval How often to reload log data from storage (seconds, default 10)
+ |""".stripMargin)
+ System.exit(exitCode)
+ }
+
}
+private[spark] abstract class ApplicationHistoryProvider {
+
+ /**
+ * This method should return a list of applications available for the history server to
+ * show. The listing is assumed to be in descending time order (so that the parameters
+ * make sense).
+ *
+ * @param offset Offset of the first entry to return.
+ * @param limit Maximum number of entries to return (-1 = no limit).
+ */
+ def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo]
+
+ /**
+ * This method should return the application information, including a rendered SparkUI.
+ *
+ * @param appId The application ID.
+ * @return The app info, or null if not found.
+ */
+ def getAppInfo(appId: String): ApplicationHistoryInfo
+
+ /**
+ * Called when the server is shutting down.
+ */
+ def stop(): Unit = { }
+
+}
private[spark] case class ApplicationHistoryInfo(
id: String,
@@ -300,7 +299,7 @@ private[spark] case class ApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
- logDirPath: Path,
+ viewAcls: String,
ui: SparkUI) {
def started = startTime != -1
def completed = endTime != -1
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
deleted file mode 100644
index 943c061743dbd..0000000000000
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.history
-
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.util.Utils
-
-/**
- * Command-line parser for the master.
- */
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
-
- parse(args.toList)
-
- private def parse(args: List[String]): Unit = {
- args match {
- case ("--dir" | "-d") :: value :: tail =>
- logDir = value
- parse(tail)
-
- case ("--help" | "-h") :: tail =>
- printUsageAndExit(0)
-
- case Nil =>
-
- case _ =>
- printUsageAndExit(1)
- }
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
- }
- }
-
- private def printUsageAndExit(exitCode: Int) {
- System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
- System.exit(exitCode)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index b08f308fda1dd..856273e1d4e21 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -51,6 +51,7 @@ private[spark] abstract class WebUI(
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+ def getSecurityManager: SecurityManager = securityManager
/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab) {