diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7dbceb9c5c1a3..bb6688e552ccd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.status.AppStateStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -215,6 +216,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _stateStore: AppStateStore = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -428,6 +430,10 @@ class SparkContext(config: SparkConf) extends Logging { _jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener) + // Initialize the app state store and listener before SparkEnv is created so that it gets + // all events. + _stateStore = AppStateStore.createTempStore(conf, listenerBus) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -449,8 +455,8 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, - _env.securityManager, appName, startTime = startTime)) + Some(SparkUI.create(Some(this), _stateStore, _conf, listenerBus, _env.securityManager, + appName, "", startTime)) } else { // For tests, do not enable the UI None @@ -1939,6 +1945,9 @@ class SparkContext(config: SparkConf) extends Logging { } SparkEnv.set(null) } + if (_stateStore != null) { + _stateStore.close() + } // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this // `SparkContext` is stopped. localProperties.remove() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index a370526c46f3d..7ef923170d33a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging @@ -40,11 +41,6 @@ import org.apache.spark.util.Clock * Incompleted applications have their update time checked on every * retrieval; if the cached entry is out of date, it is refreshed. * - * @note there must be only one instance of [[ApplicationCache]] in a - * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] - * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs - * can probe the current cache to see if the attempts have changed. - * * Creating multiple instances will break this routing. * @param operations implementation of record access operations * @param retainedApplications number of retained applications @@ -80,7 +76,7 @@ private[history] class ApplicationCache( metrics.evictionCount.inc() val key = rm.getKey logDebug(s"Evicting entry ${key}") - operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui) } } @@ -89,7 +85,7 @@ private[history] class ApplicationCache( * * Tagged as `protected` so as to allow subclasses in tests to access it directly */ - protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + private val appCache: LoadingCache[CacheKey, CacheEntry] = { CacheBuilder.newBuilder() .maximumSize(retainedApplications) .removalListener(removalListener) @@ -101,130 +97,38 @@ private[history] class ApplicationCache( */ val metrics = new CacheMetrics("history.cache") - init() - - /** - * Perform any startup operations. - * - * This includes declaring this instance as the cache to use in the - * [[ApplicationCacheCheckFilterRelay]]. - */ - private def init(): Unit = { - ApplicationCacheCheckFilterRelay.setApplicationCache(this) - } - - /** - * Stop the cache. - * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. - */ - def stop(): Unit = { - ApplicationCacheCheckFilterRelay.resetApplicationCache() - } - - /** - * Get an entry. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def get(appAndAttempt: String): SparkUI = { - val parts = splitAppAndAttemptKey(appAndAttempt) - get(parts._1, parts._2) - } - - /** - * Get the Spark UI, converting a lookup failure from an exception to `None`. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + def get(appId: String, attemptId: Option[String] = None): CacheEntry = { try { - val ui = get(appAndAttempt) - Some(ui) + appCache.get(new CacheKey(appId, attemptId)) } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - case cause: Exception => throw cause - } + case e: UncheckedExecutionException => + throw Option(e.getCause()).getOrElse(e) } } - /** - * Get the associated spark UI. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the entry - */ - def get(appId: String, attemptId: Option[String]): SparkUI = { - lookupAndUpdate(appId, attemptId)._1.ui - } - - /** - * Look up the entry; update it if needed. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the underlying cache entry -which can have its timestamp changed, and a flag to - * indicate that the entry has changed - */ - private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { - metrics.lookupCount.inc() - val cacheKey = CacheKey(appId, attemptId) - var entry = appCache.getIfPresent(cacheKey) - var updated = false - if (entry == null) { - // no entry, so fetch without any post-fetch probes for out-of-dateness - // this will trigger a callback to loadApplicationEntry() - entry = appCache.get(cacheKey) - } else if (!entry.completed) { - val now = clock.getTimeMillis() - log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") - metrics.updateProbeCount.inc() - updated = time(metrics.updateProbeTimer) { - entry.updateProbe() - } - if (updated) { - logDebug(s"refreshing $cacheKey") - metrics.updateTriggeredCount.inc() - appCache.refresh(cacheKey) - // and repeat the lookup - entry = appCache.get(cacheKey) - } else { - // update the probe timestamp to the current time - entry.probeTime = now + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + var entry = get(appId, attemptId) + + // If the entry exists, we need to make sure we run the closure with a valid entry. So + // we need to re-try until we can lock a valid entry for read. + entry.loadedUI.lock.readLock().lock() + while (!entry.loadedUI.valid) { + entry.loadedUI.lock.readLock().unlock() + appCache.invalidate(new CacheKey(appId, attemptId)) + entry = get(appId, attemptId) + if (entry == null) { + metrics.lookupFailureCount.inc() + throw new NoSuchElementException() } + metrics.loadCount.inc() + entry.loadedUI.lock.readLock().lock() } - (entry, updated) - } - - /** - * This method is visible for testing. - * - * It looks up the cached entry *and returns a clone of it*. - * This ensures that the cached entries never leak - * @param appId application ID - * @param attemptId optional attempt ID - * @return a new entry with shared SparkUI, but copies of the other fields. - */ - def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { - val entry = lookupAndUpdate(appId, attemptId)._1 - new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) - } - /** - * Probe for an application being updated. - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update has been triggered - */ - def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { - val (entry, updated) = lookupAndUpdate(appId, attemptId) - updated + try { + fn(entry.loadedUI.ui) + } finally { + entry.loadedUI.lock.readLock().unlock() + } } /** @@ -272,25 +176,21 @@ private[history] class ApplicationCache( * @throws NoSuchElementException if there is no matching element */ @throws[NoSuchElementException] - def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { - + private def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { logDebug(s"Loading application Entry $appId/$attemptId") metrics.loadCount.inc() time(metrics.loadTimer) { + metrics.lookupCount.inc() operations.getAppUI(appId, attemptId) match { - case Some(LoadedAppUI(ui, updateState)) => - val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) - if (completed) { - // completed spark UIs are attached directly - operations.attachSparkUI(appId, attemptId, ui, completed) - } else { + case Some(loadedUI) => + val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (!completed) { // incomplete UIs have the cache-check filter put in front of them. - ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) - operations.attachSparkUI(appId, attemptId, ui, completed) + registerFilter(new CacheKey(appId, attemptId), loadedUI, this) } + operations.attachSparkUI(appId, attemptId, loadedUI.ui, completed) // build the cache entry - val now = clock.getTimeMillis() - val entry = new CacheEntry(ui, completed, updateState, now) + val entry = new CacheEntry(loadedUI, completed) logDebug(s"Loaded application $appId/$attemptId -> $entry") entry case None => @@ -303,32 +203,6 @@ private[history] class ApplicationCache( } } - /** - * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. - * - * @param appAndAttempt combined key - * @return a tuple of the application ID and, if present, the attemptID - */ - def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { - val parts = appAndAttempt.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") - val appId = parts(0) - val attemptId = if (parts.length > 1) Some(parts(1)) else None - (appId, attemptId) - } - - /** - * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. - * - * If there is an `attemptId`; `applicationId` if not. - * @param appId application ID - * @param attemptId optional attempt ID - * @return a unified string - */ - def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { - appId + attemptId.map { id => s"/$id" }.getOrElse("") - } - /** * String operator dumps the cache entries and metrics. * @return a string value, primarily for testing and diagnostics @@ -347,6 +221,26 @@ private[history] class ApplicationCache( sb.append("----\n") sb.toString() } + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + def registerFilter(key: CacheKey, loadedUI: LoadedAppUI, cache: ApplicationCache): Unit = { + require(loadedUI != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val filter = new ApplicationCacheCheckFilter(key, loadedUI, cache) + val holder = new FilterHolder(filter) + require(loadedUI.ui.getHandlers != null, "null handlers") + loadedUI.ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } + + def invalidate(key: CacheKey): Unit = appCache.invalidate(key) + } /** @@ -360,14 +254,12 @@ private[history] class ApplicationCache( * @param probeTime Times in milliseconds when the probe was last executed. */ private[history] final class CacheEntry( - val ui: SparkUI, - val completed: Boolean, - val updateProbe: () => Boolean, - var probeTime: Long) { + val loadedUI: LoadedAppUI, + val completed: Boolean) { /** string value is for test assertions */ override def toString: String = { - s"UI $ui, completed=$completed, probeTime=$probeTime" + s"UI ${loadedUI.ui}, completed=$completed" } } @@ -396,23 +288,17 @@ private[history] class CacheMetrics(prefix: String) extends Source { val evictionCount = new Counter() val loadCount = new Counter() val loadTimer = new Timer() - val updateProbeCount = new Counter() - val updateProbeTimer = new Timer() - val updateTriggeredCount = new Counter() /** all the counters: for registration and string conversion. */ private val counters = Seq( ("lookup.count", lookupCount), ("lookup.failure.count", lookupFailureCount), ("eviction.count", evictionCount), - ("load.count", loadCount), - ("update.probe.count", updateProbeCount), - ("update.triggered.count", updateTriggeredCount)) + ("load.count", loadCount)) /** all metrics, including timers */ private val allMetrics = counters ++ Seq( - ("load.timer", loadTimer), - ("update.probe.timer", updateProbeTimer)) + ("load.timer", loadTimer)) /** * Name of metric source @@ -498,23 +384,11 @@ private[history] trait ApplicationCacheOperations { * Implementation note: there's some abuse of a shared global entry here because * the configuration data passed to the servlet is just a string:string map. */ -private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { - - import ApplicationCacheCheckFilterRelay._ - var appId: String = _ - var attemptId: Option[String] = _ - - /** - * Bind the app and attempt ID, throwing an exception if no application ID was provided. - * @param filterConfig configuration - */ - override def init(filterConfig: FilterConfig): Unit = { - - appId = Option(filterConfig.getInitParameter(APP_ID)) - .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) - attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) - logDebug(s"initializing filter $this") - } +private[history] class ApplicationCacheCheckFilter( + key: CacheKey, + loadedUI: LoadedAppUI, + cache: ApplicationCache) + extends Filter with Logging { /** * Filter the request. @@ -543,123 +417,23 @@ private[history] class ApplicationCacheCheckFilter() extends Filter with Logging // if the request is for an attempt, check to see if it is in need of delete/refresh // and have the cache update the UI if so - if (operation=="HEAD" || operation=="GET" - && checkForUpdates(requestURI, appId, attemptId)) { - // send a redirect back to the same location. This will be routed - // to the *new* UI - logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") - val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") - val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) - httpResponse.sendRedirect(redirectUrl) - } else { - chain.doFilter(request, response) + loadedUI.lock.readLock().lock() + try { + if (loadedUI.valid) { + chain.doFilter(request, response) + } else { + cache.invalidate(key) + val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") + val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) + httpResponse.sendRedirect(redirectUrl) + } + } finally { + loadedUI.lock.readLock().unlock() } } - override def destroy(): Unit = { - } - - override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" -} + override def init(config: FilterConfig): Unit = { } -/** - * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache - * probes to the cache. - * - * This is an ugly workaround for the limitation of servlets and filters in the Java servlet - * API; they are still configured on the model of a list of classnames and configuration - * strings in a `web.xml` field, rather than a chain of instances wired up by hand or - * via an injection framework. There is no way to directly configure a servlet filter instance - * with a reference to the application cache which is must use: some global state is needed. - * - * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests - * to the singleton [[ApplicationCache]] - * - * The field `applicationCache` must be set for the filters to work - - * this is done during the construction of [[ApplicationCache]], which requires that there - * is only one cache serving requests through the WebUI. - * - * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic - * will break: filters may not find instances. Tests must not do that. - * - */ -private[history] object ApplicationCacheCheckFilterRelay extends Logging { - // name of the app ID entry in the filter configuration. Mandatory. - val APP_ID = "appId" + override def destroy(): Unit = { } - // name of the attempt ID entry in the filter configuration. Optional. - val ATTEMPT_ID = "attemptId" - - // name of the filter to register - val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" - - /** the application cache to relay requests to */ - @volatile - private var applicationCache: Option[ApplicationCache] = None - - /** - * Set the application cache. Logs a warning if it is overwriting an existing value - * @param cache new cache - */ - def setApplicationCache(cache: ApplicationCache): Unit = { - applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) - applicationCache = Some(cache) - } - - /** - * Reset the application cache - */ - def resetApplicationCache(): Unit = { - applicationCache = None - } - - /** - * Check to see if there has been an update - * @param requestURI URI the request came in on - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update was loaded for the app/attempt - */ - def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { - - logDebug(s"Checking $appId/$attemptId from $requestURI") - applicationCache match { - case Some(cache) => - try { - cache.checkForUpdates(appId, attemptId) - } catch { - case ex: Exception => - // something went wrong. Keep going with the existing UI - logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) - false - } - - case None => - logWarning("No application cache instance defined") - false - } - } - - - /** - * Register a filter for the web UI which checks for updates to the given app/attempt - * @param ui Spark UI to attach filters to - * @param appId application ID - * @param attemptId attempt ID - */ - def registerFilter( - ui: SparkUI, - appId: String, - attemptId: Option[String] ): Unit = { - require(ui != null) - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) - val holder = new FilterHolder() - holder.setClassName(FILTER_NAME) - holder.setInitParameter(APP_ID, appId) - attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) - require(ui.getHandlers != null, "null handlers") - ui.getHandlers.foreach { handler => - handler.addFilter(holder, "/*", enumDispatcher) - } - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index e25522a28c577..966106a9f320e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.ZipOutputStream import scala.xml.Node @@ -46,31 +47,30 @@ private[spark] case class ApplicationHistoryInfo( } } -/** - * A probe which can be invoked to see if a loaded Web UI has been updated. - * The probe is expected to be relative purely to that of the UI returned - * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, - * the probe returned with it is the one that must be used to check for it - * being out of date; previous probes must be discarded. - */ -private[history] abstract class HistoryUpdateProbe { - /** - * Return true if the history provider has a later version of the application - * attempt than the one against this probe was constructed. - * @return - */ - def isUpdated(): Boolean -} - /** * All the information returned from a call to `getAppUI()`: the new UI * and any required update state. * @param ui Spark UI * @param updateProbe probe to call to check on the update state of this application attempt */ -private[spark] case class LoadedAppUI( - ui: SparkUI, - updateProbe: () => Boolean) +private[spark] case class LoadedAppUI(ui: SparkUI) { + + val lock = new ReentrantReadWriteLock() + + @volatile private var _valid = true + + def valid: Boolean = _valid + + def invalidate(): Unit = { + lock.writeLock().lock() + try { + _valid = false + } finally { + lock.writeLock().unlock() + } + } + +} private[spark] abstract class ApplicationHistoryProvider { @@ -152,4 +152,10 @@ private[spark] abstract class ApplicationHistoryProvider { * @return html text to display when the application list is empty */ def getEmptyListingHtml(): Seq[Node] = Seq.empty + + /** + * Called when an application UI is unloaded from the history server. + */ + def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e2273e530952c..01fc2291d6f13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,8 +17,10 @@ package org.apache.spark.deploy.history -import java.io.{File, FileNotFoundException, IOException, OutputStream} -import java.util.{Date, UUID} +import java.io.{File, FileNotFoundException, IOException} +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.util.{Date, ServiceLoader, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -26,8 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node -import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} -import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -43,6 +44,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.AppStateStore +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -126,36 +129,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - private val storePath = conf.get(LOCAL_STORE_DIR) + private val storePath = new File(conf.get(LOCAL_STORE_DIR)) + require(storePath.isDirectory(), s"Configured store directory ($storePath) does not exist.") private val listing = { - val dbPath = new File(storePath, "listing.ldb") - - def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) - + val metadata = new KVStoreMetadata(CURRENT_VERSION, AppStateStore.CURRENT_VERSION, + logDir.toString()) try { - val db = openDB() - val meta = db.getMetadata(classOf[KVStoreMetadata]) - - if (meta == null) { - db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) - db - } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { - logInfo("Detected mismatched config in existing DB, deleting...") - db.close() - Utils.deleteRecursively(dbPath) - openDB() - } else { - db - } + open(new File(storePath, "listing.ldb"), metadata) } catch { - case _: UnsupportedStoreVersionException => - logInfo("Detected incompatible DB versions, deleting...") - Utils.deleteRecursively(dbPath) - openDB() + case e: Exception => + // If there's an error, remove the listing database and any existing UI database + // from the store directory, since it's extremely likely that they'll all contain + // incompatible information. + logWarning(s"Error while opening existing listing database, creating new one.", e) + storePath.listFiles().foreach(Utils.deleteRecursively) + open(new File(storePath, "listing.ldb"), metadata) } } + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -179,7 +173,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // Conf option used for testing the initialization code. val initThread = initialize() private[history] def initialize(): Thread = { @@ -279,39 +272,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + return None + } + + val attempt = app.attempts.find(_.info.attemptId == attemptId).orNull + if (attempt == null) { + return None + } + + val path = uiStorePath(appId, attemptId) + if (!path.exists()) { + throw new IllegalStateException( + s"Application entry for $appId / $attemptId found, but UI not available.") + } + + val conf = this.conf.clone() + val secManager = new SecurityManager(conf) + + secManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse("")) + secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse("")) + secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + attempt.adminAclsGroups.getOrElse("")) + secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) + + val replayBus = new ReplayListenerBus() + + // Create the UI under a lock so that a valid disk store is used, in case the update thread + // is writing a new disk store for the application (see replaceStore()). + val loadedUI = synchronized { + val ui = SparkUI.create(None, AppStateStore.loadStore(conf, path), conf, replayBus, + secManager, + app.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime()) + val loaded = LoadedAppUI(ui) + activeUIs((appId, attemptId)) = loaded + loaded + } + + // TODO: remove the following replay code. It's currently needed because not all of the + // UI uses the cached state store. Once that's done (i.e. after the SQL UI is ported + // over), this code can be removed. try { - val appInfo = load(appId) - appInfo.attempts - .find { attempt => attempt.info.attemptId == attemptId } - .map { attempt => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, - HistoryServer.getAttemptURI(appId, attempt.info.attemptId), - attempt.info.startTime.getTime()) - // Do not call ui.bind() to avoid creating a new server for each application - } + val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], + Utils.getContextOrSparkClassLoader).asScala + listenerFactories.foreach { listenerFactory => + val listeners = listenerFactory.createListeners(conf, loadedUI.ui) + listeners.foreach(replayBus.addListener) + } - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - - val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, - appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) - } + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) } catch { - case _: NoSuchElementException => None + case e: Exception => + onUIDetached(appId, attemptId, loadedUI.ui) + throw e } + + Some(loadedUI) } override def getEmptyListingHtml(): Seq[Node] = { @@ -334,12 +358,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Map("Event log directory" -> logDir.toString) ++ safeMode } - override def stop(): Unit = { - listing.close() + override def stop(): Unit = synchronized { if (initThread != null && initThread.isAlive()) { initThread.interrupt() initThread.join() } + Seq(pool, replayExecutor).foreach { executor => + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + } + activeUIs.foreach { case (_, loadedUI) => loadedUI.ui.store.close() } + activeUIs.clear() + listing.close() + } + + override def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { + val uiOption = synchronized { + activeUIs.remove((appId, attemptId)) + } + uiOption.foreach { loadedUI => + loadedUI.lock.writeLock().lock() + try { + loadedUI.ui.store.close() + } finally { + loadedUI.lock.writeLock().unlock() + } + } } /** @@ -486,15 +532,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val listener = new AppListingListener(fileStatus, clock) bus.addListener(listener) + // Write the UI data to a temp location. + val tempUiPath = createTempDir("uistore") + val store = AppStateStore.createStore(tempUiPath, conf, bus) + val appCompleted = isApplicationCompleted(fileStatus) val logInput = EventLoggingListener.openEventLog(logPath, fs) try { bus.replay(logInput, logPath.toString, !appCompleted) - listener.applicationInfo.foreach(addListing) + } catch { + case e: Exception => + store.close() + Utils.deleteRecursively(tempUiPath) + throw e } finally { logInput.close() } + // Move the UI store to its final location if the app ID is known, otherwise discard it. + listener.applicationInfo.foreach { app => + addListing(app) + replaceStore(app.info.id, app.attempts.head.info.attemptId, tempUiPath) + } + Utils.deleteRecursively(tempUiPath) listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } @@ -550,16 +610,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns - * an `ApplicationEventListener` instance with event data captured from the replay. - * `ReplayEventsFilter` determines what events are replayed and can therefore limit the - * data captured in the returned `ApplicationEventListener` instance. + * Replays the events in the specified log file on the supplied `ReplayListenerBus`. + * `ReplayEventsFilter` determines what events are replayed. */ private def replay( eventLog: FileStatus, appCompleted: Boolean, bus: ReplayListenerBus, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -570,10 +628,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - val appListener = new ApplicationEventListener - bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) - appListener } finally { logInput.close() } @@ -616,30 +671,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | application count=$count}""".stripMargin } - /** - * Return true iff a newer version of the UI is available. The check is based on whether the - * fileSize for the currently loaded UI is smaller than the file size the last time - * the logs were loaded. - * - * This is a very cheap operation -- the work of loading the new attempt was already done - * by [[checkForLogs]]. - * @param appId application to probe - * @param attemptId attempt to probe - * @param prevFileSize the file size of the logs for the currently displayed UI - */ - private def updateProbe( - appId: String, - attemptId: Option[String], - prevFileSize: Long)(): Boolean = { - try { - val attempt = getAttempt(appId, attemptId) - val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) - recordedFileSize(logPath) > prevFileSize - } catch { - case _: NoSuchElementException => false - } - } - private def recordedFileSize(log: Path): Long = { try { listing.read(classOf[LogInfo], log.toString()).fileSize @@ -680,6 +711,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } + private def createTempDir(name: String): File = { + val perms = PosixFilePermissions.fromString("rwx------") + Files.createTempDirectory(storePath.toPath(), name, + PosixFilePermissions.asFileAttribute(perms)).toFile() + } + + private def uiStorePath(appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(storePath, fileName) + } + + private def replaceStore(appId: String, attemptId: Option[String], newStore: File): Unit = { + val uiStore = uiStorePath(appId, attemptId) + + synchronized { + // If there's an active UI for the application, invalidate it and close its store, so that + // we can replace it with the updated one. + activeUIs.remove((appId, attemptId)).foreach { loadedUI => + loadedUI.invalidate() + loadedUI.ui.store.close() + } + + if (uiStore.exists()) { + Utils.deleteRecursively(uiStore) + } + + if (!newStore.renameTo(uiStore)) { + throw new IOException(s"Failed to rename UI store from $newStore to $uiStore.") + } + } + } + /** For testing. Returns internal data about a single attempt. */ private[history] def getAttempt(appId: String, attemptId: Option[String]): KVStoreAttemptInfo = { load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( @@ -700,18 +763,23 @@ private[history] object FsHistoryProvider { private val CURRENT_VERSION = 1L } -case class KVStoreMetadata( +private[history] case class KVStoreMetadata( val version: Long, + val uiVersion: Long, val logDir: String) -case class LogInfo( +private[history] case class LogInfo( @KVIndexParam val logPath: String, val fileSize: Long) -class KVStoreAttemptInfo( +private[history] class KVStoreAttemptInfo( val info: v1.ApplicationAttemptInfo, val logPath: String, - val fileSize: Long) { + val fileSize: Long, + val adminAcls: Option[String], + val viewAcls: Option[String], + val adminAclsGroups: Option[String], + val viewAclsGroups: Option[String]) { def toAppAttemptInfo(): ApplicationAttemptInfo = { ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), @@ -721,7 +789,7 @@ class KVStoreAttemptInfo( } -class KVStoreApplicationInfo( +private[history] class KVStoreApplicationInfo( val info: v1.ApplicationInfo, val attempts: List[KVStoreAttemptInfo]) { @@ -750,10 +818,15 @@ class KVStoreApplicationInfo( } -private[spark] class AppListingListener(log: FileStatus, clock: Clock) - extends SparkListener { +private[history] class AppListingListener( + log: FileStatus, + clock: Clock) + extends SparkListener with KVUtils { - private var attempt = new KVStoreAttemptInfo( + // This listener doesn't use read/update, so no need for a KVStore instance. + override protected val kvstore: KVStore = null + + private var attempt = newAttemptInfo(None, new v1.ApplicationAttemptInfo(None, new Date(-1), new Date(-1), new Date(log.getModificationTime()), -1, null, false), log.getPath().getName(), log.getLen()) @@ -778,7 +851,7 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) event.sparkUser, attempt.info.completed) - attempt = new KVStoreAttemptInfo(newInfo, attempt.logPath, attempt.fileSize) + attempt = newAttemptInfo(attempt, info = newInfo) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { @@ -791,7 +864,21 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) attempt.info.sparkUser, true) - attempt = new KVStoreAttemptInfo(newInfo, attempt.logPath, attempt.fileSize) + attempt = newAttemptInfo(attempt, info = newInfo) + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + val allProperties = event.environmentDetails("Spark Properties").toMap + val viewAcls = allProperties.get("spark.ui.view.acls") + val adminAcls = allProperties.get("spark.admin.acls") + val viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") + val adminAclsGroups = allProperties.get("spark.admin.acls.groups") + + attempt = newAttemptInfo(attempt, + viewAcls = viewAcls, + adminAcls = adminAcls, + viewAclsGroups = viewAclsGroups, + adminAclsGroups = adminAclsGroups) } def applicationInfo: Option[KVStoreApplicationInfo] = { @@ -802,4 +889,23 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) } } + private def newAttemptInfo( + old: Option[KVStoreAttemptInfo], + info: Option[v1.ApplicationAttemptInfo] = None, + logPath: Option[String] = None, + fileSize: Option[Long] = None, + adminAcls: Option[Option[String]] = None, + viewAcls: Option[Option[String]] = None, + adminAclsGroups: Option[Option[String]] = None, + viewAclsGroups: Option[Option[String]] = None): KVStoreAttemptInfo = { + new KVStoreAttemptInfo( + info.orElse(old.map(_.info)).orNull, + logPath.orElse(old.map(_.logPath)).orNull, + fileSize.orElse(old.map(_.fileSize)).getOrElse(0L), + adminAcls.getOrElse(old.flatMap(_.adminAcls)), + viewAcls.getOrElse(old.flatMap(_.viewAcls)), + adminAclsGroups.getOrElse(old.flatMap(_.adminAclsGroups)), + viewAclsGroups.getOrElse(old.flatMap(_.viewAclsGroups))) + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 949b307820218..4900a8130a921 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -106,8 +106,8 @@ class HistoryServer( } } - def getSparkUI(appKey: String): Option[SparkUI] = { - appCache.getSparkUI(appKey) + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + appCache.withSparkUI(appId, attemptId)(fn) } initialize() @@ -140,7 +140,6 @@ class HistoryServer( override def stop() { super.stop() provider.stop() - appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -158,6 +157,7 @@ class HistoryServer( override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) + provider.onUIDetached(appId, attemptId, ui) } /** @@ -224,15 +224,13 @@ class HistoryServer( */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId, attemptId) + appCache.withSparkUI(appId, attemptId) { _ => + // Do nothing, just force the UI to load. + } true } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - false - - case cause: Exception => throw cause - } + case NonFatal(e: NoSuchElementException) => + false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala deleted file mode 100644 index 28c45d800ed06..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -/** - * A simple listener for application events. - * - * This listener expects to hear events from a single application only. If events - * from multiple applications are seen, the behavior is unspecified. - */ -private[spark] class ApplicationEventListener extends SparkListener { - var appName: Option[String] = None - var appId: Option[String] = None - var appAttemptId: Option[String] = None - var sparkUser: Option[String] = None - var startTime: Option[Long] = None - var endTime: Option[Long] = None - var viewAcls: Option[String] = None - var adminAcls: Option[String] = None - var viewAclsGroups: Option[String] = None - var adminAclsGroups: Option[String] = None - - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = Some(applicationStart.appName) - appId = applicationStart.appId - appAttemptId = applicationStart.appAttemptId - startTime = Some(applicationStart.time) - sparkUser = Some(applicationStart.sparkUser) - } - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = Some(applicationEnd.time) - } - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.get("spark.ui.view.acls") - adminAcls = allProperties.get("spark.admin.acls") - viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") - adminAclsGroups = allProperties.get("spark.admin.acls.groups") - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala index 61af0ff1aba2b..3350469cb2048 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -38,9 +38,6 @@ import org.apache.spark.ui.SparkUI * * TODO (M4): * - Need to add state / information needed by the UI that is currently not in the API. - * - Need mechanism to insert this into SparkContext and be called when SparkContext is - * closed (new plugin interface?) - * - Need to close store in live UI after SparkContext closes. * - Cache active jobs / stages / other interesting things in memory to make it faster * to update them * - Flush data to the store in a separate thread (to avoid stalling the listener bus). @@ -305,7 +302,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends status = event.taskInfo.status, taskLocality = event.taskInfo.taskLocality.toString(), speculative = event.taskInfo.speculative) - kvstore.write(new TaskDataWrapper(task)) + kvstore.write(new TaskDataWrapper(task, event.stageId, event.stageAttemptId)) } updateStageData(event.stageId, event.stageAttemptId) { stage => @@ -369,7 +366,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends status = event.taskInfo.status, errorMessage = errorMessage, taskMetrics = Option(event.taskMetrics).map(newTaskMetrics)) - new TaskDataWrapper(newInfo) + new TaskDataWrapper(newInfo, event.stageId, event.stageAttemptId) } val (completedDelta, failedDelta) = event.reason match { @@ -676,7 +673,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends private def updateTaskData(id: Long)(fn: TaskDataWrapper => TaskDataWrapper): Unit = { update[TaskDataWrapper](id) { old => - val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id))) + val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id), -1, -1)) fn(task) } } @@ -1083,7 +1080,6 @@ private class AppStateListener(override protected val kvstore: KVStore) extends private[spark] object AppStateListener { - val CURRENT_VERSION = 1L val DEFAULT_DATE = new Date(-1) val UNKNOWN = "" diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala new file mode 100644 index 0000000000000..49cd73b18fe50 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, List => JList} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.kvstore.KVStore +import org.apache.spark.scheduler.SparkListenerBus +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{Distribution, Utils} + +/** + * A wrapper around a KVStore that provides methods for accessing the API data stored within. + */ +private[spark] class AppStateStore private (store: KVStore, tempStorePath: Option[File]) { + + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { + val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty()) { + it.filter { job => statuses.contains(job.status) }.toSeq + } else { + it.toSeq + } + } + + def job(jobId: Int): v1.JobData = { + store.read(classOf[JobDataWrapper], jobId).info + } + + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { + store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true).last(true) + .asScala.map(_.info).toSeq + } + + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { + val it = store.view(classOf[StageDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty) { + it.filter { s => statuses.contains(s.status) }.toSeq + } else { + it.toSeq + } + } + + def stageData(stageId: Int): Seq[v1.StageData] = { + store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId) + .asScala.map(_.info).toSeq + } + + def stageAttempt(stageId: Int, stageAttemptId: Int): v1.StageData = { + store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).info + } + + def taskSummary( + stageId: Int, + stageAttemptId: Int, + quantiles: Array[Double]): v1.TaskMetricDistributions = { + + val stage = Array(stageId, stageAttemptId) + + val rawMetrics = store.view(classOf[TaskDataWrapper]) + .index("stage") + .first(stage) + .last(stage) + .asScala + .flatMap(_.info.taskMetrics) + .toList + .view + + def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = + Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) + + // We need to do a lot of similar munging to nested metrics here. For each one, + // we want (a) extract the values for nested metrics (b) make a distribution for each metric + // (c) shove the distribution into the right field in our return type and (d) only return + // a result if the option is defined for any of the tasks. MetricHelper is a little util + // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just + // implement one "build" method, which just builds the quantiles for each field. + + val inputMetrics = + new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics + + def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( + bytesRead = submetricQuantiles(_.bytesRead), + recordsRead = submetricQuantiles(_.recordsRead) + ) + }.build + + val outputMetrics = + new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics + + def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( + bytesWritten = submetricQuantiles(_.bytesWritten), + recordsWritten = submetricQuantiles(_.recordsWritten) + ) + }.build + + val shuffleReadMetrics = + new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = + raw.shuffleReadMetrics + + def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( + readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, + readRecords = submetricQuantiles(_.recordsRead), + remoteBytesRead = submetricQuantiles(_.remoteBytesRead), + remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), + localBlocksFetched = submetricQuantiles(_.localBlocksFetched), + totalBlocksFetched = submetricQuantiles { s => + s.localBlocksFetched + s.remoteBlocksFetched + }, + fetchWaitTime = submetricQuantiles(_.fetchWaitTime) + ) + }.build + + val shuffleWriteMetrics = + new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = + raw.shuffleWriteMetrics + + def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( + writeBytes = submetricQuantiles(_.bytesWritten), + writeRecords = submetricQuantiles(_.recordsWritten), + writeTime = submetricQuantiles(_.writeTime) + ) + }.build + + new v1.TaskMetricDistributions( + quantiles = quantiles, + executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), + executorRunTime = metricQuantiles(_.executorRunTime), + executorCpuTime = metricQuantiles(_.executorCpuTime), + resultSize = metricQuantiles(_.resultSize), + jvmGcTime = metricQuantiles(_.jvmGcTime), + resultSerializationTime = metricQuantiles(_.resultSerializationTime), + memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), + diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), + inputMetrics = inputMetrics, + outputMetrics = outputMetrics, + shuffleReadMetrics = shuffleReadMetrics, + shuffleWriteMetrics = shuffleWriteMetrics + ) + } + + def taskList( + stageId: Int, + stageAttemptId: Int, + offset: Int, + length: Int, + sortBy: v1.TaskSorting): Seq[v1.TaskData] = { + val stageKey = Array(stageId, stageAttemptId) + val base = store.view(classOf[TaskDataWrapper]) + val indexed = sortBy match { + case v1.TaskSorting.ID => + base.index("stage").first(stageKey).last(stageKey) + case v1.TaskSorting.INCREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue)) + case v1.TaskSorting.DECREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L)) + .reverse() + } + indexed.skip(offset).max(length).asScala.map(_.info).toSeq + } + + def rddList(): Seq[v1.RDDStorageInfo] = { + store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq + } + + def rdd(rddId: Int): v1.RDDStorageInfo = { + store.read(classOf[RDDStorageInfoWrapper], rddId).info + } + + def close(): Unit = { + store.close() + tempStorePath.foreach(Utils.deleteRecursively) + } + +} + +private[spark] object AppStateStore { + + val CURRENT_VERSION = 1L + + /** Loads a UI store from the given path, creating an empty store if it doesn't exist. */ + def loadStore(conf: SparkConf, path: File): AppStateStore = { + new AppStateStore(loadStore(path), None) + } + + /** + * Crate a state store in a temporary path. A listener will be attached to the given bus to + * populate the store, and the directory will be deleted when the store is closed. + */ + def createTempStore(conf: SparkConf, bus: SparkListenerBus): AppStateStore = { + val temp = Utils.createTempDir(namePrefix = "appstate") + initializeStore(loadStore(temp), Some(temp), bus) + } + + /** + * Create a store in the given path, attaching a listener to the given bus to populate the + * store. The path will not be deleted after the store is closed. + */ + def createStore(path: File, conf: SparkConf, bus: SparkListenerBus): AppStateStore = { + initializeStore(loadStore(path), None, bus) + } + + private def initializeStore( + store: KVStore, + tempPath: Option[File], + bus: SparkListenerBus): AppStateStore = { + val stateStore = new AppStateStore(store, tempPath) + val listener = new AppStateListener(store) + bus.addListener(listener) + stateStore + } + + private def loadStore(path: File): KVStore = { + val metadata = new AppStatusStoreMetadata(CURRENT_VERSION) + KVUtils.open(path, metadata) + } + +} + +/** + * Helper for getting distributions from nested metric types. + */ +private abstract class MetricHelper[I, O]( + rawMetrics: Seq[v1.TaskMetrics], + quantiles: Array[Double]) { + + def getSubmetrics(raw: v1.TaskMetrics): I + + def build: O + + val data: Seq[I] = rawMetrics.map(getSubmetrics) + + /** applies the given function to all input metrics, and returns the quantiles */ + def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { + Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index f17b637754826..9d3833086172f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -248,7 +248,13 @@ private[spark] object ApiRootResource { * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { - def getSparkUI(appKey: String): Option[SparkUI] + /** + * Runs some code with the current SparkUI instance for the app / attempt. + * + * @throws NoSuchElementException If the app / attempt pair does not exist. + */ + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T + def getApplicationInfoList: Iterator[ApplicationInfo] def getApplicationInfo(appId: String): Option[ApplicationInfo] @@ -293,15 +299,18 @@ private[v1] trait ApiRequestContext { * to it. If there is no such app, throw an appropriate exception */ def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) - uiRoot.getSparkUI(appKey) match { - case Some(ui) => + try { + uiRoot.withSparkUI(appId, attemptId) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } f(ui) - case None => throw new NotFoundException("no such app: " + appId) + } + } catch { + case _: NoSuchElementException => + val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + throw new NotFoundException(s"no such app: $appKey") } } } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index d4b980f5dbb07..b82e57fa453b8 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,13 +17,15 @@ package org.apache.spark.status +import java.lang.{Integer => JInteger, Long => JLong} + import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.kvstore.KVIndex import org.apache.spark.status.api.v1._ import org.apache.spark.status.KVUtils._ -private[spark] class AppStatusStoreMetadata( +private[spark] case class AppStatusStoreMetadata( val version: Long) private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { @@ -38,6 +40,9 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { @JsonIgnore @KVIndex def id: String = info.id + @JsonIgnore @KVIndex("active") + def active: Boolean = info.isActive + @JsonIgnore @KVIndex("host") def host: String = info.hostPort.split(":")(0) @@ -65,13 +70,33 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex def id: Array[Int] = Array(info.stageId, info.attemptId) + @JsonIgnore @KVIndex("stageId") + def stageId: Int = info.stageId + } -private[spark] class TaskDataWrapper(val info: TaskData) { +/** + * The task information is always indexed with the stage ID, since that is how the UI and API + * consume it. That means every indexed value has the stage ID and attempt ID included, aside + * from the actual data being indexed. + */ +private[spark] class TaskDataWrapper( + val info: TaskData, + val stageId: Int, + val stageAttemptId: Int) { @JsonIgnore @KVIndex def id: Long = info.taskId + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + + @JsonIgnore @KVIndex("runtime") + def runtime: Array[AnyRef] = { + val _runtime = info.taskMetrics.map(_.executorRunTime).getOrElse(-1L) + Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong) + } + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index bf4cf79e9faa3..8d6ac55c05b32 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStateStore import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener @@ -39,6 +40,7 @@ import org.apache.spark.util.Utils * Top level user interface for a Spark application. */ private[spark] class SparkUI private ( + val store: AppStateStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, @@ -99,8 +101,12 @@ private[spark] class SparkUI private ( logInfo(s"Stopped Spark web UI at $webUrl") } - def getSparkUI(appId: String): Option[SparkUI] = { - if (appId == this.appId) Some(this) else None + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + if (appId == this.appId) { + fn(this) + } else { + throw new NoSuchElementException() + } } def getApplicationInfoList: Iterator[ApplicationInfo] = { @@ -152,60 +158,24 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } - def createLiveUI( - sc: SparkContext, - conf: SparkConf, - listenerBus: SparkListenerBus, - jobProgressListener: JobProgressListener, - securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { - create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener), startTime = startTime) - } - - def createHistoryUI( - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String, - startTime: Long): SparkUI = { - val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) - - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) - } - sparkUI - } - /** - * Create a new Spark UI. - * - * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. - * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the - * web UI will create and register its own JobProgressListener. + * Create a new UI backed by an AppStateStore. */ - private def create( + def create( sc: Option[SparkContext], + store: AppStateStore, conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, + basePath: String, startTime: Long): SparkUI = { - val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val jobProgressListener = sc.map(_.jobProgressListener).getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } - val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) @@ -218,8 +188,9 @@ private[spark] object SparkUI { listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, + new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 7998e3702c122..35f69d57ba609 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -18,15 +18,11 @@ package org.apache.spark.deploy.history import java.util.{Date, NoSuchElementException} -import javax.servlet.Filter import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.mutable -import scala.collection.mutable.ListBuffer import com.codahale.metrics.Counter -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.ServletContextHandler import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -39,23 +35,10 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.ManualClock class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers { - /** - * subclass with access to the cache internals - * @param retainedApplications number of retained applications - */ - class TestApplicationCache( - operations: ApplicationCacheOperations = new StubCacheOperations(), - retainedApplications: Int, - clock: Clock = new ManualClock(0)) - extends ApplicationCache(operations, retainedApplications, clock) { - - def cache(): LoadingCache[CacheKey, CacheEntry] = appCache - } - /** * Stub cache operations. * The state is kept in a map of [[CacheKey]] to [[CacheEntry]], @@ -77,8 +60,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 - instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime))) + instances.get(CacheKey(appId, attemptId)).map { e => e.loadedUI } } override def attachSparkUI( @@ -96,10 +78,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp) - attachSparkUI(appId, attemptId, ui, completed) + ended: Long): LoadedAppUI = { + val ui = putAppUI(appId, attemptId, completed, started, ended) + attachSparkUI(appId, attemptId, ui.ui, completed) ui } @@ -108,23 +89,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = newUI(appId, attemptId, completed, started, ended) - putInstance(appId, attemptId, ui, completed, timestamp) + ended: Long): LoadedAppUI = { + val ui = LoadedAppUI(newUI(appId, attemptId, completed, started, ended)) + instances(CacheKey(appId, attemptId)) = new CacheEntry(ui, completed) ui } - def putInstance( - appId: String, - attemptId: Option[String], - ui: SparkUI, - completed: Boolean, - timestamp: Long): Unit = { - instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp)) - } - /** * Detach a reconstructed UI * @@ -146,23 +116,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attached.get(CacheKey(appId, attemptId)) } - /** - * The update probe. - * @param appId application to probe - * @param attemptId attempt to probe - * @param updateTime timestamp of this UI load - */ - private[history] def updateProbe( - appId: String, - attemptId: Option[String], - updateTime: Long)(): Boolean = { - updateProbeCount += 1 - logDebug(s"isUpdated($appId, $attemptId, ${updateTime})") - val entry = instances.get(CacheKey(appId, attemptId)).get - val updated = entry.probeTime > updateTime - logDebug(s"entry = $entry; updated = $updated") - updated - } } /** @@ -210,15 +163,13 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val now = clock.getTimeMillis() // add the entry - operations.putAppUI(app1, None, true, now, now, now) + operations.putAppUI(app1, None, true, now, now) // make sure its local operations.getAppUI(app1, None).get operations.getAppUICount = 0 // now expect it to be found - val cacheEntry = cache.lookupCacheEntry(app1, None) - assert(1 === cacheEntry.probeTime) - assert(cacheEntry.completed) + cache.withSparkUI(app1, None) { _ => } // assert about queries made of the operations assert(1 === operations.getAppUICount, "getAppUICount") assert(1 === operations.attachCount, "attachCount") @@ -236,8 +187,8 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar assert(0 === operations.detachCount, "attachCount") // evict the entry - operations.putAndAttach("2", None, true, time2, time2, time2) - operations.putAndAttach("3", None, true, time2, time2, time2) + operations.putAndAttach("2", None, true, time2, time2) + operations.putAndAttach("3", None, true, time2, time2) cache.get("2") cache.get("3") @@ -248,7 +199,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val appId = "app1" val attemptId = Some("_01") val time3 = clock.getTimeMillis() - operations.putAppUI(appId, attemptId, false, time3, 0, time3) + operations.putAppUI(appId, attemptId, false, time3, 0) // expect an error here assertNotFound(appId, None) } @@ -256,10 +207,11 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Test that if an attempt ID is set, it must be used in lookups") { val operations = new StubCacheOperations() val clock = new ManualClock(1) - implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = 10, + clock = clock) val appId = "app1" val attemptId = Some("_01") - operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0) + operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0) assertNotFound(appId, None) } @@ -271,50 +223,29 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Incomplete apps refreshed") { val operations = new StubCacheOperations() val clock = new ManualClock(50) - val window = 500 - implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock) + implicit val cache = new ApplicationCache(operations, 5, clock) val metrics = cache.metrics // add the incomplete app // add the entry val started = clock.getTimeMillis() val appId = "app1" val attemptId = Some("001") - operations.putAppUI(appId, attemptId, false, started, 0, started) - val firstEntry = cache.lookupCacheEntry(appId, attemptId) - assert(started === firstEntry.probeTime, s"timestamp in $firstEntry") - assert(!firstEntry.completed, s"entry is complete: $firstEntry") - assertMetric("lookupCount", metrics.lookupCount, 1) + val initialUI = operations.putAndAttach(appId, attemptId, false, started, 0) + val firstUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assertMetric("lookupCount", metrics.lookupCount, 1) assert(0 === operations.updateProbeCount, "expected no update probe on that first get") - val checkTime = window * 2 - clock.setTime(checkTime) - val entry3 = cache.lookupCacheEntry(appId, attemptId) - assert(firstEntry !== entry3, s"updated entry test from $cache") + // Invalidate the first entry to trigger a re-load. + initialUI.invalidate() + + // Update the UI in the stub so that a new one is provided to the cache. + operations.putAppUI(appId, attemptId, true, started, started + 10) + + val updatedUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assert(firstUI !== updatedUI, s"expected updated UI") assertMetric("lookupCount", metrics.lookupCount, 2) - assertMetric("updateProbeCount", metrics.updateProbeCount, 1) - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0) - assert(1 === operations.updateProbeCount, s"refresh count in $cache") - assert(0 === operations.detachCount, s"detach count") - assert(entry3.probeTime === checkTime) - - val updateTime = window * 3 - // update the cached value - val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime) - val endTime = window * 10 - clock.setTime(endTime) - logDebug(s"Before operation = $cache") - val entry5 = cache.lookupCacheEntry(appId, attemptId) - assertMetric("lookupCount", metrics.lookupCount, 3) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) - // the update was triggered - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1) - assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache") - - // at which point, the refreshes stop - clock.setTime(window * 20) - assertCacheEntryEquals(appId, attemptId, entry5) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + assert(1 === operations.detachCount, s"detach count") } /** @@ -337,27 +268,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } } - /** - * Look up the cache entry and assert that it matches in the expected value. - * This assertion works if the two CacheEntries are different -it looks at the fields. - * UI are compared on object equality; the timestamp and completed flags directly. - * @param appId application ID - * @param attemptId attempt ID - * @param expected expected value - * @param cache app cache - */ - def assertCacheEntryEquals( - appId: String, - attemptId: Option[String], - expected: CacheEntry) - (implicit cache: ApplicationCache): Unit = { - val actual = cache.lookupCacheEntry(appId, attemptId) - val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache" - assert(expected.ui === actual.ui, errorText + " SparkUI reference") - assert(expected.completed === actual.completed, errorText + " -completed flag") - assert(expected.probeTime === actual.probeTime, errorText + " -timestamp") - } - /** * Assert that a key wasn't found in cache or loaded. * @@ -370,14 +280,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar appId: String, attemptId: Option[String]) (implicit cache: ApplicationCache): Unit = { - val ex = intercept[UncheckedExecutionException] { + val ex = intercept[NoSuchElementException] { cache.get(appId, attemptId) } - var cause = ex.getCause - assert(cause !== null) - if (!cause.isInstanceOf[NoSuchElementException]) { - throw cause - } } test("Large Scale Application Eviction") { @@ -385,12 +290,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val clock = new ManualClock(0) val size = 5 // only two entries are retained, so we expect evictions to occur on lookups - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = size, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = size, + clock = clock) val attempt1 = Some("01") - val ids = new ListBuffer[String]() + val ids = new mutable.ListBuffer[String]() // build a list of applications val count = 100 for (i <- 1 to count ) { @@ -398,7 +303,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar ids += appId clock.advance(10) val t = clock.getTimeMillis() - operations.putAppUI(appId, attempt1, true, t, t, t) + operations.putAppUI(appId, attempt1, true, t, t) } // now go through them in sequence reading them, expect evictions ids.foreach { id => @@ -413,20 +318,19 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Attempts are Evicted") { val operations = new StubCacheOperations() - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = 4) + implicit val cache = new ApplicationCache(operations, 4, new ManualClock()) val metrics = cache.metrics val appId = "app1" val attempt1 = Some("01") val attempt2 = Some("02") val attempt3 = Some("03") - operations.putAppUI(appId, attempt1, true, 100, 110, 110) - operations.putAppUI(appId, attempt2, true, 200, 210, 210) - operations.putAppUI(appId, attempt3, true, 300, 310, 310) + operations.putAppUI(appId, attempt1, true, 100, 110) + operations.putAppUI(appId, attempt2, true, 200, 210) + operations.putAppUI(appId, attempt3, true, 300, 310) val attempt4 = Some("04") - operations.putAppUI(appId, attempt4, true, 400, 410, 410) + operations.putAppUI(appId, attempt4, true, 400, 410) val attempt5 = Some("05") - operations.putAppUI(appId, attempt5, true, 500, 510, 510) + operations.putAppUI(appId, attempt5, true, 500, 510) def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = { assertMetric("loadCount", metrics.loadCount, expectedLoad) @@ -457,20 +361,14 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } - test("Instantiate Filter") { - // this is a regression test on the filter being constructable - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val instance = clazz.newInstance() - instance shouldBe a [Filter] - } - test("redirect includes query params") { - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter] - filter.appId = "local-123" + val operations = new StubCacheOperations() + val ui = operations.putAndAttach("foo", None, true, 0, 10) val cache = mock[ApplicationCache] - when(cache.checkForUpdates(any(), any())).thenReturn(true) - ApplicationCacheCheckFilterRelay.setApplicationCache(cache) + when(cache.operations).thenReturn(operations) + val filter = new ApplicationCacheCheckFilter(new CacheKey("foo", None), ui, cache) + ui.invalidate() + val request = mock[HttpServletRequest] when(request.getMethod()).thenReturn("GET") when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 483bd648ddb87..d9fb13d8a499e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -598,6 +598,41 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("invalidate cached UI") { + val storeDir = Utils.createTempDir() + val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) + val provider = newProvider(conf) + val appId = "new1" + + // Write an incomplete app log. + val appLog = newLogFile(appId, None, inProgress = true) + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None) + ) + provider.checkForLogs() + + // Load the app UI. + val oldUI = provider.getAppUI(appId, None) + assert(oldUI.isDefined) + intercept[NoSuchElementException] { + oldUI.get.ui.store.job(0) + } + + // Add more info to the app log, and trigger the provider to update things. + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) + provider.checkForLogs() + + // Load the UI again and make sure we can get the new info added to the logs. + val freshUI = provider.getAppUI(appId, None) + assert(freshUI.isDefined) + assert(freshUI != oldUI) + freshUI.get.ui.store.job(0) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 4277b82faa277..d2a318a680baa 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -72,6 +72,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private var port: Int = -1 def init(extraConf: (String, String)*): Unit = { + Utils.deleteRecursively(storeDir) + assert(storeDir.mkdir()) val conf = new SparkConf() .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") @@ -292,21 +294,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val uiRoot = "/testwebproxybase" System.setProperty("spark.ui.proxyBase", uiRoot) - server.stop() - - val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir) - .set("spark.history.fs.update.interval", "0") - .set("spark.testing", "true") - .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) - - provider = new FsHistoryProvider(conf) - provider.checkForLogs() - val securityManager = HistoryServer.createSecurityManager(conf) - - server = new HistoryServer(conf, provider, securityManager, 18080) - server.initialize() - server.bind() + stop() + init() val port = server.boundPort @@ -375,8 +364,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { - server.stop() - implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -386,6 +373,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // a new conf is used with the background thread set and running at its fastest // allowed refresh rate (1Hz) + stop() val myConf = new SparkConf() .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) @@ -418,7 +406,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - server = new HistoryServer(myConf, provider, securityManager, 18080) + server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() val port = server.boundPort @@ -464,7 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers rootAppPage should not be empty def getAppUI: SparkUI = { - provider.getAppUI(appId, None).get.ui + server.withSparkUI(appId, None) { ui => ui } } // selenium isn't that useful on failures...add our own reporting @@ -519,7 +507,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getNumJobs("") should be (1) getNumJobs("/jobs") should be (1) getNumJobsRestful() should be (1) - assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + assert(metrics.lookupCount.getCount > 0, s"lookup count too low in $metrics") // dump state before the next bit of test, which is where update // checking really gets stressed diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala index 8da3451f62b01..85054a3093f09 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.status import java.io.File +import java.lang.{Integer => JInteger, Long => JLong} import java.util.{Arrays, Date, Properties} import scala.collection.JavaConverters._ @@ -172,6 +173,14 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { s1Tasks.foreach { task => check[TaskDataWrapper](task.taskId) { wrapper => assert(wrapper.info.taskId === task.taskId) + assert(wrapper.stageId === stages.head.stageId) + assert(wrapper.stageAttemptId === stages.head.attemptId) + assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId))) + + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + -1L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) + assert(wrapper.info.index === task.index) assert(wrapper.info.attempt === task.attemptNumber) assert(wrapper.info.launchTime === new Date(task.launchTime)) @@ -253,6 +262,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { val s1Metrics = TaskMetrics.empty s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) time += 1 pending.foreach { task => @@ -274,9 +284,12 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { } pending.foreach { task => - check[TaskDataWrapper](task.taskId) { task => - assert(task.info.errorMessage === None) - assert(task.info.taskMetrics.get.executorCpuTime === 2L) + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.errorMessage === None) + assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L) + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + 4L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dc4d5019b5cd6..feba90ac2c950 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,6 +38,7 @@ object MimaExcludes { lazy val v23excludes = v22excludes ++ Seq( // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") )