From b3e02d3d3103847502a1f28c91a3a8979348933f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Nov 2016 13:59:35 -0800 Subject: [PATCH] SHS-NG M4.0: Initial UI hook up. This change adds some building blocks for hooking up the new data store to the UI. This is achieved by returning a new SparkUI implementation when using the new KVStoreProvider; this new UI does not currently contain any data for the old UI / API endpoints; that will be implemented in M4. The interaction between the UI and the underlying store was isolated in a new AppStateStore class. The M4 code will call into this class to retrieve data to populate the UI and API. Some new indexed fields had to be added to the stored types so that the code could efficiently process the API requests. On the history server side, some changes were made in how the UI is used. Because there's state kept on disk, the code needs to be more careful about closing those resources when the UIs are unloaded; and because of that some locking needs to exist to make sure it's OK to move files around. The app cache was also simplified a bit; it just checks a flag in the UI instance to check whether it should be used, and tries to re-load it when the FS listing code invalidates a loaded UI. --- .../scala/org/apache/spark/SparkContext.scala | 13 +- .../deploy/history/ApplicationCache.scala | 384 ++++-------------- .../history/ApplicationHistoryProvider.scala | 44 +- .../deploy/history/FsHistoryProvider.scala | 310 +++++++++----- .../spark/deploy/history/HistoryServer.scala | 18 +- .../scheduler/ApplicationEventListener.scala | 60 --- .../spark/status/AppStateListener.scala | 10 +- .../apache/spark/status/AppStateStore.scala | 263 ++++++++++++ .../spark/status/api/v1/ApiRootResource.scala | 19 +- .../org/apache/spark/status/storeTypes.scala | 29 +- .../scala/org/apache/spark/ui/SparkUI.scala | 61 +-- .../history/ApplicationCacheSuite.scala | 194 +++------ .../history/FsHistoryProviderSuite.scala | 35 ++ .../deploy/history/HistoryServerSuite.scala | 28 +- .../spark/status/AppStateListenerSuite.scala | 19 +- project/MimaExcludes.scala | 1 + 16 files changed, 760 insertions(+), 728 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala create mode 100644 core/src/main/scala/org/apache/spark/status/AppStateStore.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7dbceb9c5c1a3..bb6688e552ccd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.status.AppStateStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -215,6 +216,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _stateStore: AppStateStore = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -428,6 +430,10 @@ class SparkContext(config: SparkConf) extends Logging { _jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener) + // Initialize the app state store and listener before SparkEnv is created so that it gets + // all events. + _stateStore = AppStateStore.createTempStore(conf, listenerBus) + // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) @@ -449,8 +455,8 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, - _env.securityManager, appName, startTime = startTime)) + Some(SparkUI.create(Some(this), _stateStore, _conf, listenerBus, _env.securityManager, + appName, "", startTime)) } else { // For tests, do not enable the UI None @@ -1939,6 +1945,9 @@ class SparkContext(config: SparkConf) extends Logging { } SparkEnv.set(null) } + if (_stateStore != null) { + _stateStore.close() + } // Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this // `SparkContext` is stopped. localProperties.remove() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index a370526c46f3d..7ef923170d33a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging @@ -40,11 +41,6 @@ import org.apache.spark.util.Clock * Incompleted applications have their update time checked on every * retrieval; if the cached entry is out of date, it is refreshed. * - * @note there must be only one instance of [[ApplicationCache]] in a - * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] - * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs - * can probe the current cache to see if the attempts have changed. - * * Creating multiple instances will break this routing. * @param operations implementation of record access operations * @param retainedApplications number of retained applications @@ -80,7 +76,7 @@ private[history] class ApplicationCache( metrics.evictionCount.inc() val key = rm.getKey logDebug(s"Evicting entry ${key}") - operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui) } } @@ -89,7 +85,7 @@ private[history] class ApplicationCache( * * Tagged as `protected` so as to allow subclasses in tests to access it directly */ - protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + private val appCache: LoadingCache[CacheKey, CacheEntry] = { CacheBuilder.newBuilder() .maximumSize(retainedApplications) .removalListener(removalListener) @@ -101,130 +97,38 @@ private[history] class ApplicationCache( */ val metrics = new CacheMetrics("history.cache") - init() - - /** - * Perform any startup operations. - * - * This includes declaring this instance as the cache to use in the - * [[ApplicationCacheCheckFilterRelay]]. - */ - private def init(): Unit = { - ApplicationCacheCheckFilterRelay.setApplicationCache(this) - } - - /** - * Stop the cache. - * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. - */ - def stop(): Unit = { - ApplicationCacheCheckFilterRelay.resetApplicationCache() - } - - /** - * Get an entry. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def get(appAndAttempt: String): SparkUI = { - val parts = splitAppAndAttemptKey(appAndAttempt) - get(parts._1, parts._2) - } - - /** - * Get the Spark UI, converting a lookup failure from an exception to `None`. - * @param appAndAttempt application to look up in the format needed by the history server web UI, - * `appId/attemptId` or `appId`. - * @return the entry - */ - def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + def get(appId: String, attemptId: Option[String] = None): CacheEntry = { try { - val ui = get(appAndAttempt) - Some(ui) + appCache.get(new CacheKey(appId, attemptId)) } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - case cause: Exception => throw cause - } + case e: UncheckedExecutionException => + throw Option(e.getCause()).getOrElse(e) } } - /** - * Get the associated spark UI. - * - * Cache fetch/refresh will have taken place by the time this method returns. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the entry - */ - def get(appId: String, attemptId: Option[String]): SparkUI = { - lookupAndUpdate(appId, attemptId)._1.ui - } - - /** - * Look up the entry; update it if needed. - * @param appId application ID - * @param attemptId optional attempt ID - * @return the underlying cache entry -which can have its timestamp changed, and a flag to - * indicate that the entry has changed - */ - private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { - metrics.lookupCount.inc() - val cacheKey = CacheKey(appId, attemptId) - var entry = appCache.getIfPresent(cacheKey) - var updated = false - if (entry == null) { - // no entry, so fetch without any post-fetch probes for out-of-dateness - // this will trigger a callback to loadApplicationEntry() - entry = appCache.get(cacheKey) - } else if (!entry.completed) { - val now = clock.getTimeMillis() - log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") - metrics.updateProbeCount.inc() - updated = time(metrics.updateProbeTimer) { - entry.updateProbe() - } - if (updated) { - logDebug(s"refreshing $cacheKey") - metrics.updateTriggeredCount.inc() - appCache.refresh(cacheKey) - // and repeat the lookup - entry = appCache.get(cacheKey) - } else { - // update the probe timestamp to the current time - entry.probeTime = now + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + var entry = get(appId, attemptId) + + // If the entry exists, we need to make sure we run the closure with a valid entry. So + // we need to re-try until we can lock a valid entry for read. + entry.loadedUI.lock.readLock().lock() + while (!entry.loadedUI.valid) { + entry.loadedUI.lock.readLock().unlock() + appCache.invalidate(new CacheKey(appId, attemptId)) + entry = get(appId, attemptId) + if (entry == null) { + metrics.lookupFailureCount.inc() + throw new NoSuchElementException() } + metrics.loadCount.inc() + entry.loadedUI.lock.readLock().lock() } - (entry, updated) - } - - /** - * This method is visible for testing. - * - * It looks up the cached entry *and returns a clone of it*. - * This ensures that the cached entries never leak - * @param appId application ID - * @param attemptId optional attempt ID - * @return a new entry with shared SparkUI, but copies of the other fields. - */ - def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { - val entry = lookupAndUpdate(appId, attemptId)._1 - new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) - } - /** - * Probe for an application being updated. - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update has been triggered - */ - def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { - val (entry, updated) = lookupAndUpdate(appId, attemptId) - updated + try { + fn(entry.loadedUI.ui) + } finally { + entry.loadedUI.lock.readLock().unlock() + } } /** @@ -272,25 +176,21 @@ private[history] class ApplicationCache( * @throws NoSuchElementException if there is no matching element */ @throws[NoSuchElementException] - def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { - + private def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { logDebug(s"Loading application Entry $appId/$attemptId") metrics.loadCount.inc() time(metrics.loadTimer) { + metrics.lookupCount.inc() operations.getAppUI(appId, attemptId) match { - case Some(LoadedAppUI(ui, updateState)) => - val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) - if (completed) { - // completed spark UIs are attached directly - operations.attachSparkUI(appId, attemptId, ui, completed) - } else { + case Some(loadedUI) => + val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (!completed) { // incomplete UIs have the cache-check filter put in front of them. - ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) - operations.attachSparkUI(appId, attemptId, ui, completed) + registerFilter(new CacheKey(appId, attemptId), loadedUI, this) } + operations.attachSparkUI(appId, attemptId, loadedUI.ui, completed) // build the cache entry - val now = clock.getTimeMillis() - val entry = new CacheEntry(ui, completed, updateState, now) + val entry = new CacheEntry(loadedUI, completed) logDebug(s"Loaded application $appId/$attemptId -> $entry") entry case None => @@ -303,32 +203,6 @@ private[history] class ApplicationCache( } } - /** - * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. - * - * @param appAndAttempt combined key - * @return a tuple of the application ID and, if present, the attemptID - */ - def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { - val parts = appAndAttempt.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") - val appId = parts(0) - val attemptId = if (parts.length > 1) Some(parts(1)) else None - (appId, attemptId) - } - - /** - * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. - * - * If there is an `attemptId`; `applicationId` if not. - * @param appId application ID - * @param attemptId optional attempt ID - * @return a unified string - */ - def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { - appId + attemptId.map { id => s"/$id" }.getOrElse("") - } - /** * String operator dumps the cache entries and metrics. * @return a string value, primarily for testing and diagnostics @@ -347,6 +221,26 @@ private[history] class ApplicationCache( sb.append("----\n") sb.toString() } + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + def registerFilter(key: CacheKey, loadedUI: LoadedAppUI, cache: ApplicationCache): Unit = { + require(loadedUI != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val filter = new ApplicationCacheCheckFilter(key, loadedUI, cache) + val holder = new FilterHolder(filter) + require(loadedUI.ui.getHandlers != null, "null handlers") + loadedUI.ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } + + def invalidate(key: CacheKey): Unit = appCache.invalidate(key) + } /** @@ -360,14 +254,12 @@ private[history] class ApplicationCache( * @param probeTime Times in milliseconds when the probe was last executed. */ private[history] final class CacheEntry( - val ui: SparkUI, - val completed: Boolean, - val updateProbe: () => Boolean, - var probeTime: Long) { + val loadedUI: LoadedAppUI, + val completed: Boolean) { /** string value is for test assertions */ override def toString: String = { - s"UI $ui, completed=$completed, probeTime=$probeTime" + s"UI ${loadedUI.ui}, completed=$completed" } } @@ -396,23 +288,17 @@ private[history] class CacheMetrics(prefix: String) extends Source { val evictionCount = new Counter() val loadCount = new Counter() val loadTimer = new Timer() - val updateProbeCount = new Counter() - val updateProbeTimer = new Timer() - val updateTriggeredCount = new Counter() /** all the counters: for registration and string conversion. */ private val counters = Seq( ("lookup.count", lookupCount), ("lookup.failure.count", lookupFailureCount), ("eviction.count", evictionCount), - ("load.count", loadCount), - ("update.probe.count", updateProbeCount), - ("update.triggered.count", updateTriggeredCount)) + ("load.count", loadCount)) /** all metrics, including timers */ private val allMetrics = counters ++ Seq( - ("load.timer", loadTimer), - ("update.probe.timer", updateProbeTimer)) + ("load.timer", loadTimer)) /** * Name of metric source @@ -498,23 +384,11 @@ private[history] trait ApplicationCacheOperations { * Implementation note: there's some abuse of a shared global entry here because * the configuration data passed to the servlet is just a string:string map. */ -private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { - - import ApplicationCacheCheckFilterRelay._ - var appId: String = _ - var attemptId: Option[String] = _ - - /** - * Bind the app and attempt ID, throwing an exception if no application ID was provided. - * @param filterConfig configuration - */ - override def init(filterConfig: FilterConfig): Unit = { - - appId = Option(filterConfig.getInitParameter(APP_ID)) - .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) - attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) - logDebug(s"initializing filter $this") - } +private[history] class ApplicationCacheCheckFilter( + key: CacheKey, + loadedUI: LoadedAppUI, + cache: ApplicationCache) + extends Filter with Logging { /** * Filter the request. @@ -543,123 +417,23 @@ private[history] class ApplicationCacheCheckFilter() extends Filter with Logging // if the request is for an attempt, check to see if it is in need of delete/refresh // and have the cache update the UI if so - if (operation=="HEAD" || operation=="GET" - && checkForUpdates(requestURI, appId, attemptId)) { - // send a redirect back to the same location. This will be routed - // to the *new* UI - logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") - val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") - val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) - httpResponse.sendRedirect(redirectUrl) - } else { - chain.doFilter(request, response) + loadedUI.lock.readLock().lock() + try { + if (loadedUI.valid) { + chain.doFilter(request, response) + } else { + cache.invalidate(key) + val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") + val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) + httpResponse.sendRedirect(redirectUrl) + } + } finally { + loadedUI.lock.readLock().unlock() } } - override def destroy(): Unit = { - } - - override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" -} + override def init(config: FilterConfig): Unit = { } -/** - * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache - * probes to the cache. - * - * This is an ugly workaround for the limitation of servlets and filters in the Java servlet - * API; they are still configured on the model of a list of classnames and configuration - * strings in a `web.xml` field, rather than a chain of instances wired up by hand or - * via an injection framework. There is no way to directly configure a servlet filter instance - * with a reference to the application cache which is must use: some global state is needed. - * - * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests - * to the singleton [[ApplicationCache]] - * - * The field `applicationCache` must be set for the filters to work - - * this is done during the construction of [[ApplicationCache]], which requires that there - * is only one cache serving requests through the WebUI. - * - * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic - * will break: filters may not find instances. Tests must not do that. - * - */ -private[history] object ApplicationCacheCheckFilterRelay extends Logging { - // name of the app ID entry in the filter configuration. Mandatory. - val APP_ID = "appId" + override def destroy(): Unit = { } - // name of the attempt ID entry in the filter configuration. Optional. - val ATTEMPT_ID = "attemptId" - - // name of the filter to register - val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" - - /** the application cache to relay requests to */ - @volatile - private var applicationCache: Option[ApplicationCache] = None - - /** - * Set the application cache. Logs a warning if it is overwriting an existing value - * @param cache new cache - */ - def setApplicationCache(cache: ApplicationCache): Unit = { - applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) - applicationCache = Some(cache) - } - - /** - * Reset the application cache - */ - def resetApplicationCache(): Unit = { - applicationCache = None - } - - /** - * Check to see if there has been an update - * @param requestURI URI the request came in on - * @param appId application ID - * @param attemptId attempt ID - * @return true if an update was loaded for the app/attempt - */ - def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { - - logDebug(s"Checking $appId/$attemptId from $requestURI") - applicationCache match { - case Some(cache) => - try { - cache.checkForUpdates(appId, attemptId) - } catch { - case ex: Exception => - // something went wrong. Keep going with the existing UI - logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) - false - } - - case None => - logWarning("No application cache instance defined") - false - } - } - - - /** - * Register a filter for the web UI which checks for updates to the given app/attempt - * @param ui Spark UI to attach filters to - * @param appId application ID - * @param attemptId attempt ID - */ - def registerFilter( - ui: SparkUI, - appId: String, - attemptId: Option[String] ): Unit = { - require(ui != null) - val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) - val holder = new FilterHolder() - holder.setClassName(FILTER_NAME) - holder.setInitParameter(APP_ID, appId) - attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) - require(ui.getHandlers != null, "null handlers") - ui.getHandlers.foreach { handler => - handler.addFilter(holder, "/*", enumDispatcher) - } - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index e25522a28c577..966106a9f320e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.zip.ZipOutputStream import scala.xml.Node @@ -46,31 +47,30 @@ private[spark] case class ApplicationHistoryInfo( } } -/** - * A probe which can be invoked to see if a loaded Web UI has been updated. - * The probe is expected to be relative purely to that of the UI returned - * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, - * the probe returned with it is the one that must be used to check for it - * being out of date; previous probes must be discarded. - */ -private[history] abstract class HistoryUpdateProbe { - /** - * Return true if the history provider has a later version of the application - * attempt than the one against this probe was constructed. - * @return - */ - def isUpdated(): Boolean -} - /** * All the information returned from a call to `getAppUI()`: the new UI * and any required update state. * @param ui Spark UI * @param updateProbe probe to call to check on the update state of this application attempt */ -private[spark] case class LoadedAppUI( - ui: SparkUI, - updateProbe: () => Boolean) +private[spark] case class LoadedAppUI(ui: SparkUI) { + + val lock = new ReentrantReadWriteLock() + + @volatile private var _valid = true + + def valid: Boolean = _valid + + def invalidate(): Unit = { + lock.writeLock().lock() + try { + _valid = false + } finally { + lock.writeLock().unlock() + } + } + +} private[spark] abstract class ApplicationHistoryProvider { @@ -152,4 +152,10 @@ private[spark] abstract class ApplicationHistoryProvider { * @return html text to display when the application list is empty */ def getEmptyListingHtml(): Seq[Node] = Seq.empty + + /** + * Called when an application UI is unloaded from the history server. + */ + def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e2273e530952c..01fc2291d6f13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,8 +17,10 @@ package org.apache.spark.deploy.history -import java.io.{File, FileNotFoundException, IOException, OutputStream} -import java.util.{Date, UUID} +import java.io.{File, FileNotFoundException, IOException} +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.util.{Date, ServiceLoader, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -26,8 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.xml.Node -import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude} -import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} @@ -43,6 +44,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.AppStateStore +import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI @@ -126,36 +129,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - private val storePath = conf.get(LOCAL_STORE_DIR) + private val storePath = new File(conf.get(LOCAL_STORE_DIR)) + require(storePath.isDirectory(), s"Configured store directory ($storePath) does not exist.") private val listing = { - val dbPath = new File(storePath, "listing.ldb") - - def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) - + val metadata = new KVStoreMetadata(CURRENT_VERSION, AppStateStore.CURRENT_VERSION, + logDir.toString()) try { - val db = openDB() - val meta = db.getMetadata(classOf[KVStoreMetadata]) - - if (meta == null) { - db.setMetadata(new KVStoreMetadata(CURRENT_VERSION, logDir.toString())) - db - } else if (meta.version != CURRENT_VERSION || !logDir.toString().equals(meta.logDir)) { - logInfo("Detected mismatched config in existing DB, deleting...") - db.close() - Utils.deleteRecursively(dbPath) - openDB() - } else { - db - } + open(new File(storePath, "listing.ldb"), metadata) } catch { - case _: UnsupportedStoreVersionException => - logInfo("Detected incompatible DB versions, deleting...") - Utils.deleteRecursively(dbPath) - openDB() + case e: Exception => + // If there's an error, remove the listing database and any existing UI database + // from the store directory, since it's extremely likely that they'll all contain + // incompatible information. + logWarning(s"Error while opening existing listing database, creating new one.", e) + storePath.listFiles().foreach(Utils.deleteRecursively) + open(new File(storePath, "listing.ldb"), metadata) } } + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -179,7 +173,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - // Conf option used for testing the initialization code. val initThread = initialize() private[history] def initialize(): Thread = { @@ -279,39 +272,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + val app = try { + load(appId) + } catch { + case _: NoSuchElementException => + return None + } + + val attempt = app.attempts.find(_.info.attemptId == attemptId).orNull + if (attempt == null) { + return None + } + + val path = uiStorePath(appId, attemptId) + if (!path.exists()) { + throw new IllegalStateException( + s"Application entry for $appId / $attemptId found, but UI not available.") + } + + val conf = this.conf.clone() + val secManager = new SecurityManager(conf) + + secManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse("")) + secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse("")) + secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + attempt.adminAclsGroups.getOrElse("")) + secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) + + val replayBus = new ReplayListenerBus() + + // Create the UI under a lock so that a valid disk store is used, in case the update thread + // is writing a new disk store for the application (see replaceStore()). + val loadedUI = synchronized { + val ui = SparkUI.create(None, AppStateStore.loadStore(conf, path), conf, replayBus, + secManager, + app.info.name, + HistoryServer.getAttemptURI(appId, attempt.info.attemptId), + attempt.info.startTime.getTime()) + val loaded = LoadedAppUI(ui) + activeUIs((appId, attemptId)) = loaded + loaded + } + + // TODO: remove the following replay code. It's currently needed because not all of the + // UI uses the cached state store. Once that's done (i.e. after the SQL UI is ported + // over), this code can be removed. try { - val appInfo = load(appId) - appInfo.attempts - .find { attempt => attempt.info.attemptId == attemptId } - .map { attempt => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.info.name, - HistoryServer.getAttemptURI(appId, attempt.info.attemptId), - attempt.info.startTime.getTime()) - // Do not call ui.bind() to avoid creating a new server for each application - } + val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], + Utils.getContextOrSparkClassLoader).asScala + listenerFactories.foreach { listenerFactory => + val listeners = listenerFactory.createListeners(conf, loadedUI.ui) + listeners.foreach(replayBus.addListener) + } - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - - val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.info.sparkUser, - appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) - } + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) } catch { - case _: NoSuchElementException => None + case e: Exception => + onUIDetached(appId, attemptId, loadedUI.ui) + throw e } + + Some(loadedUI) } override def getEmptyListingHtml(): Seq[Node] = { @@ -334,12 +358,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) Map("Event log directory" -> logDir.toString) ++ safeMode } - override def stop(): Unit = { - listing.close() + override def stop(): Unit = synchronized { if (initThread != null && initThread.isAlive()) { initThread.interrupt() initThread.join() } + Seq(pool, replayExecutor).foreach { executor => + executor.shutdown() + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow() + } + } + activeUIs.foreach { case (_, loadedUI) => loadedUI.ui.store.close() } + activeUIs.clear() + listing.close() + } + + override def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { + val uiOption = synchronized { + activeUIs.remove((appId, attemptId)) + } + uiOption.foreach { loadedUI => + loadedUI.lock.writeLock().lock() + try { + loadedUI.ui.store.close() + } finally { + loadedUI.lock.writeLock().unlock() + } + } } /** @@ -486,15 +532,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val listener = new AppListingListener(fileStatus, clock) bus.addListener(listener) + // Write the UI data to a temp location. + val tempUiPath = createTempDir("uistore") + val store = AppStateStore.createStore(tempUiPath, conf, bus) + val appCompleted = isApplicationCompleted(fileStatus) val logInput = EventLoggingListener.openEventLog(logPath, fs) try { bus.replay(logInput, logPath.toString, !appCompleted) - listener.applicationInfo.foreach(addListing) + } catch { + case e: Exception => + store.close() + Utils.deleteRecursively(tempUiPath) + throw e } finally { logInput.close() } + // Move the UI store to its final location if the app ID is known, otherwise discard it. + listener.applicationInfo.foreach { app => + addListing(app) + replaceStore(app.info.id, app.attempts.head.info.attemptId, tempUiPath) + } + Utils.deleteRecursively(tempUiPath) listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) } @@ -550,16 +610,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns - * an `ApplicationEventListener` instance with event data captured from the replay. - * `ReplayEventsFilter` determines what events are replayed and can therefore limit the - * data captured in the returned `ApplicationEventListener` instance. + * Replays the events in the specified log file on the supplied `ReplayListenerBus`. + * `ReplayEventsFilter` determines what events are replayed. */ private def replay( eventLog: FileStatus, appCompleted: Boolean, bus: ReplayListenerBus, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -570,10 +628,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - val appListener = new ApplicationEventListener - bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) - appListener } finally { logInput.close() } @@ -616,30 +671,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) | application count=$count}""".stripMargin } - /** - * Return true iff a newer version of the UI is available. The check is based on whether the - * fileSize for the currently loaded UI is smaller than the file size the last time - * the logs were loaded. - * - * This is a very cheap operation -- the work of loading the new attempt was already done - * by [[checkForLogs]]. - * @param appId application to probe - * @param attemptId attempt to probe - * @param prevFileSize the file size of the logs for the currently displayed UI - */ - private def updateProbe( - appId: String, - attemptId: Option[String], - prevFileSize: Long)(): Boolean = { - try { - val attempt = getAttempt(appId, attemptId) - val logPath = fs.makeQualified(new Path(logDir, attempt.logPath)) - recordedFileSize(logPath) > prevFileSize - } catch { - case _: NoSuchElementException => false - } - } - private def recordedFileSize(log: Path): Long = { try { listing.read(classOf[LogInfo], log.toString()).fileSize @@ -680,6 +711,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } + private def createTempDir(name: String): File = { + val perms = PosixFilePermissions.fromString("rwx------") + Files.createTempDirectory(storePath.toPath(), name, + PosixFilePermissions.asFileAttribute(perms)).toFile() + } + + private def uiStorePath(appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(storePath, fileName) + } + + private def replaceStore(appId: String, attemptId: Option[String], newStore: File): Unit = { + val uiStore = uiStorePath(appId, attemptId) + + synchronized { + // If there's an active UI for the application, invalidate it and close its store, so that + // we can replace it with the updated one. + activeUIs.remove((appId, attemptId)).foreach { loadedUI => + loadedUI.invalidate() + loadedUI.ui.store.close() + } + + if (uiStore.exists()) { + Utils.deleteRecursively(uiStore) + } + + if (!newStore.renameTo(uiStore)) { + throw new IOException(s"Failed to rename UI store from $newStore to $uiStore.") + } + } + } + /** For testing. Returns internal data about a single attempt. */ private[history] def getAttempt(appId: String, attemptId: Option[String]): KVStoreAttemptInfo = { load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( @@ -700,18 +763,23 @@ private[history] object FsHistoryProvider { private val CURRENT_VERSION = 1L } -case class KVStoreMetadata( +private[history] case class KVStoreMetadata( val version: Long, + val uiVersion: Long, val logDir: String) -case class LogInfo( +private[history] case class LogInfo( @KVIndexParam val logPath: String, val fileSize: Long) -class KVStoreAttemptInfo( +private[history] class KVStoreAttemptInfo( val info: v1.ApplicationAttemptInfo, val logPath: String, - val fileSize: Long) { + val fileSize: Long, + val adminAcls: Option[String], + val viewAcls: Option[String], + val adminAclsGroups: Option[String], + val viewAclsGroups: Option[String]) { def toAppAttemptInfo(): ApplicationAttemptInfo = { ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), @@ -721,7 +789,7 @@ class KVStoreAttemptInfo( } -class KVStoreApplicationInfo( +private[history] class KVStoreApplicationInfo( val info: v1.ApplicationInfo, val attempts: List[KVStoreAttemptInfo]) { @@ -750,10 +818,15 @@ class KVStoreApplicationInfo( } -private[spark] class AppListingListener(log: FileStatus, clock: Clock) - extends SparkListener { +private[history] class AppListingListener( + log: FileStatus, + clock: Clock) + extends SparkListener with KVUtils { - private var attempt = new KVStoreAttemptInfo( + // This listener doesn't use read/update, so no need for a KVStore instance. + override protected val kvstore: KVStore = null + + private var attempt = newAttemptInfo(None, new v1.ApplicationAttemptInfo(None, new Date(-1), new Date(-1), new Date(log.getModificationTime()), -1, null, false), log.getPath().getName(), log.getLen()) @@ -778,7 +851,7 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) event.sparkUser, attempt.info.completed) - attempt = new KVStoreAttemptInfo(newInfo, attempt.logPath, attempt.fileSize) + attempt = newAttemptInfo(attempt, info = newInfo) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { @@ -791,7 +864,21 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) attempt.info.sparkUser, true) - attempt = new KVStoreAttemptInfo(newInfo, attempt.logPath, attempt.fileSize) + attempt = newAttemptInfo(attempt, info = newInfo) + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + val allProperties = event.environmentDetails("Spark Properties").toMap + val viewAcls = allProperties.get("spark.ui.view.acls") + val adminAcls = allProperties.get("spark.admin.acls") + val viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") + val adminAclsGroups = allProperties.get("spark.admin.acls.groups") + + attempt = newAttemptInfo(attempt, + viewAcls = viewAcls, + adminAcls = adminAcls, + viewAclsGroups = viewAclsGroups, + adminAclsGroups = adminAclsGroups) } def applicationInfo: Option[KVStoreApplicationInfo] = { @@ -802,4 +889,23 @@ private[spark] class AppListingListener(log: FileStatus, clock: Clock) } } + private def newAttemptInfo( + old: Option[KVStoreAttemptInfo], + info: Option[v1.ApplicationAttemptInfo] = None, + logPath: Option[String] = None, + fileSize: Option[Long] = None, + adminAcls: Option[Option[String]] = None, + viewAcls: Option[Option[String]] = None, + adminAclsGroups: Option[Option[String]] = None, + viewAclsGroups: Option[Option[String]] = None): KVStoreAttemptInfo = { + new KVStoreAttemptInfo( + info.orElse(old.map(_.info)).orNull, + logPath.orElse(old.map(_.logPath)).orNull, + fileSize.orElse(old.map(_.fileSize)).getOrElse(0L), + adminAcls.getOrElse(old.flatMap(_.adminAcls)), + viewAcls.getOrElse(old.flatMap(_.viewAcls)), + adminAclsGroups.getOrElse(old.flatMap(_.adminAclsGroups)), + viewAclsGroups.getOrElse(old.flatMap(_.viewAclsGroups))) + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 949b307820218..4900a8130a921 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -106,8 +106,8 @@ class HistoryServer( } } - def getSparkUI(appKey: String): Option[SparkUI] = { - appCache.getSparkUI(appKey) + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + appCache.withSparkUI(appId, attemptId)(fn) } initialize() @@ -140,7 +140,6 @@ class HistoryServer( override def stop() { super.stop() provider.stop() - appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -158,6 +157,7 @@ class HistoryServer( override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) + provider.onUIDetached(appId, attemptId, ui) } /** @@ -224,15 +224,13 @@ class HistoryServer( */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId, attemptId) + appCache.withSparkUI(appId, attemptId) { _ => + // Do nothing, just force the UI to load. + } true } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - false - - case cause: Exception => throw cause - } + case NonFatal(e: NoSuchElementException) => + false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala deleted file mode 100644 index 28c45d800ed06..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -/** - * A simple listener for application events. - * - * This listener expects to hear events from a single application only. If events - * from multiple applications are seen, the behavior is unspecified. - */ -private[spark] class ApplicationEventListener extends SparkListener { - var appName: Option[String] = None - var appId: Option[String] = None - var appAttemptId: Option[String] = None - var sparkUser: Option[String] = None - var startTime: Option[Long] = None - var endTime: Option[Long] = None - var viewAcls: Option[String] = None - var adminAcls: Option[String] = None - var viewAclsGroups: Option[String] = None - var adminAclsGroups: Option[String] = None - - override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = Some(applicationStart.appName) - appId = applicationStart.appId - appAttemptId = applicationStart.appAttemptId - startTime = Some(applicationStart.time) - sparkUser = Some(applicationStart.sparkUser) - } - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = Some(applicationEnd.time) - } - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.get("spark.ui.view.acls") - adminAcls = allProperties.get("spark.admin.acls") - viewAclsGroups = allProperties.get("spark.ui.view.acls.groups") - adminAclsGroups = allProperties.get("spark.admin.acls.groups") - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala index 61af0ff1aba2b..3350469cb2048 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -38,9 +38,6 @@ import org.apache.spark.ui.SparkUI * * TODO (M4): * - Need to add state / information needed by the UI that is currently not in the API. - * - Need mechanism to insert this into SparkContext and be called when SparkContext is - * closed (new plugin interface?) - * - Need to close store in live UI after SparkContext closes. * - Cache active jobs / stages / other interesting things in memory to make it faster * to update them * - Flush data to the store in a separate thread (to avoid stalling the listener bus). @@ -305,7 +302,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends status = event.taskInfo.status, taskLocality = event.taskInfo.taskLocality.toString(), speculative = event.taskInfo.speculative) - kvstore.write(new TaskDataWrapper(task)) + kvstore.write(new TaskDataWrapper(task, event.stageId, event.stageAttemptId)) } updateStageData(event.stageId, event.stageAttemptId) { stage => @@ -369,7 +366,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends status = event.taskInfo.status, errorMessage = errorMessage, taskMetrics = Option(event.taskMetrics).map(newTaskMetrics)) - new TaskDataWrapper(newInfo) + new TaskDataWrapper(newInfo, event.stageId, event.stageAttemptId) } val (completedDelta, failedDelta) = event.reason match { @@ -676,7 +673,7 @@ private class AppStateListener(override protected val kvstore: KVStore) extends private def updateTaskData(id: Long)(fn: TaskDataWrapper => TaskDataWrapper): Unit = { update[TaskDataWrapper](id) { old => - val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id))) + val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id), -1, -1)) fn(task) } } @@ -1083,7 +1080,6 @@ private class AppStateListener(override protected val kvstore: KVStore) extends private[spark] object AppStateListener { - val CURRENT_VERSION = 1L val DEFAULT_DATE = new Date(-1) val UNKNOWN = "" diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala new file mode 100644 index 0000000000000..49cd73b18fe50 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, List => JList} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.kvstore.KVStore +import org.apache.spark.scheduler.SparkListenerBus +import org.apache.spark.status.api.v1 +import org.apache.spark.util.{Distribution, Utils} + +/** + * A wrapper around a KVStore that provides methods for accessing the API data stored within. + */ +private[spark] class AppStateStore private (store: KVStore, tempStorePath: Option[File]) { + + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { + val it = store.view(classOf[JobDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty()) { + it.filter { job => statuses.contains(job.status) }.toSeq + } else { + it.toSeq + } + } + + def job(jobId: Int): v1.JobData = { + store.read(classOf[JobDataWrapper], jobId).info + } + + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { + store.view(classOf[ExecutorSummaryWrapper]).index("active").reverse().first(true).last(true) + .asScala.map(_.info).toSeq + } + + def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { + val it = store.view(classOf[StageDataWrapper]).asScala.map(_.info) + if (!statuses.isEmpty) { + it.filter { s => statuses.contains(s.status) }.toSeq + } else { + it.toSeq + } + } + + def stageData(stageId: Int): Seq[v1.StageData] = { + store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId) + .asScala.map(_.info).toSeq + } + + def stageAttempt(stageId: Int, stageAttemptId: Int): v1.StageData = { + store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).info + } + + def taskSummary( + stageId: Int, + stageAttemptId: Int, + quantiles: Array[Double]): v1.TaskMetricDistributions = { + + val stage = Array(stageId, stageAttemptId) + + val rawMetrics = store.view(classOf[TaskDataWrapper]) + .index("stage") + .first(stage) + .last(stage) + .asScala + .flatMap(_.info.taskMetrics) + .toList + .view + + def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] = + Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) + + // We need to do a lot of similar munging to nested metrics here. For each one, + // we want (a) extract the values for nested metrics (b) make a distribution for each metric + // (c) shove the distribution into the right field in our return type and (d) only return + // a result if the option is defined for any of the tasks. MetricHelper is a little util + // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just + // implement one "build" method, which just builds the quantiles for each field. + + val inputMetrics = + new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics + + def build: v1.InputMetricDistributions = new v1.InputMetricDistributions( + bytesRead = submetricQuantiles(_.bytesRead), + recordsRead = submetricQuantiles(_.recordsRead) + ) + }.build + + val outputMetrics = + new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics + + def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions( + bytesWritten = submetricQuantiles(_.bytesWritten), + recordsWritten = submetricQuantiles(_.recordsWritten) + ) + }.build + + val shuffleReadMetrics = + new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics = + raw.shuffleReadMetrics + + def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions( + readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead }, + readRecords = submetricQuantiles(_.recordsRead), + remoteBytesRead = submetricQuantiles(_.remoteBytesRead), + remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), + localBlocksFetched = submetricQuantiles(_.localBlocksFetched), + totalBlocksFetched = submetricQuantiles { s => + s.localBlocksFetched + s.remoteBlocksFetched + }, + fetchWaitTime = submetricQuantiles(_.fetchWaitTime) + ) + }.build + + val shuffleWriteMetrics = + new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics = + raw.shuffleWriteMetrics + + def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions( + writeBytes = submetricQuantiles(_.bytesWritten), + writeRecords = submetricQuantiles(_.recordsWritten), + writeTime = submetricQuantiles(_.writeTime) + ) + }.build + + new v1.TaskMetricDistributions( + quantiles = quantiles, + executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), + executorRunTime = metricQuantiles(_.executorRunTime), + executorCpuTime = metricQuantiles(_.executorCpuTime), + resultSize = metricQuantiles(_.resultSize), + jvmGcTime = metricQuantiles(_.jvmGcTime), + resultSerializationTime = metricQuantiles(_.resultSerializationTime), + memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), + diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), + inputMetrics = inputMetrics, + outputMetrics = outputMetrics, + shuffleReadMetrics = shuffleReadMetrics, + shuffleWriteMetrics = shuffleWriteMetrics + ) + } + + def taskList( + stageId: Int, + stageAttemptId: Int, + offset: Int, + length: Int, + sortBy: v1.TaskSorting): Seq[v1.TaskData] = { + val stageKey = Array(stageId, stageAttemptId) + val base = store.view(classOf[TaskDataWrapper]) + val indexed = sortBy match { + case v1.TaskSorting.ID => + base.index("stage").first(stageKey).last(stageKey) + case v1.TaskSorting.INCREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue)) + case v1.TaskSorting.DECREASING_RUNTIME => + base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L)) + .reverse() + } + indexed.skip(offset).max(length).asScala.map(_.info).toSeq + } + + def rddList(): Seq[v1.RDDStorageInfo] = { + store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq + } + + def rdd(rddId: Int): v1.RDDStorageInfo = { + store.read(classOf[RDDStorageInfoWrapper], rddId).info + } + + def close(): Unit = { + store.close() + tempStorePath.foreach(Utils.deleteRecursively) + } + +} + +private[spark] object AppStateStore { + + val CURRENT_VERSION = 1L + + /** Loads a UI store from the given path, creating an empty store if it doesn't exist. */ + def loadStore(conf: SparkConf, path: File): AppStateStore = { + new AppStateStore(loadStore(path), None) + } + + /** + * Crate a state store in a temporary path. A listener will be attached to the given bus to + * populate the store, and the directory will be deleted when the store is closed. + */ + def createTempStore(conf: SparkConf, bus: SparkListenerBus): AppStateStore = { + val temp = Utils.createTempDir(namePrefix = "appstate") + initializeStore(loadStore(temp), Some(temp), bus) + } + + /** + * Create a store in the given path, attaching a listener to the given bus to populate the + * store. The path will not be deleted after the store is closed. + */ + def createStore(path: File, conf: SparkConf, bus: SparkListenerBus): AppStateStore = { + initializeStore(loadStore(path), None, bus) + } + + private def initializeStore( + store: KVStore, + tempPath: Option[File], + bus: SparkListenerBus): AppStateStore = { + val stateStore = new AppStateStore(store, tempPath) + val listener = new AppStateListener(store) + bus.addListener(listener) + stateStore + } + + private def loadStore(path: File): KVStore = { + val metadata = new AppStatusStoreMetadata(CURRENT_VERSION) + KVUtils.open(path, metadata) + } + +} + +/** + * Helper for getting distributions from nested metric types. + */ +private abstract class MetricHelper[I, O]( + rawMetrics: Seq[v1.TaskMetrics], + quantiles: Array[Double]) { + + def getSubmetrics(raw: v1.TaskMetrics): I + + def build: O + + val data: Seq[I] = rawMetrics.map(getSubmetrics) + + /** applies the given function to all input metrics, and returns the quantiles */ + def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { + Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index f17b637754826..9d3833086172f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -248,7 +248,13 @@ private[spark] object ApiRootResource { * interface needed for them all to expose application info as json. */ private[spark] trait UIRoot { - def getSparkUI(appKey: String): Option[SparkUI] + /** + * Runs some code with the current SparkUI instance for the app / attempt. + * + * @throws NoSuchElementException If the app / attempt pair does not exist. + */ + def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T + def getApplicationInfoList: Iterator[ApplicationInfo] def getApplicationInfo(appId: String): Option[ApplicationInfo] @@ -293,15 +299,18 @@ private[v1] trait ApiRequestContext { * to it. If there is no such app, throw an appropriate exception */ def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { - val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) - uiRoot.getSparkUI(appKey) match { - case Some(ui) => + try { + uiRoot.withSparkUI(appId, attemptId) { ui => val user = httpRequest.getRemoteUser() if (!ui.securityManager.checkUIViewPermissions(user)) { throw new ForbiddenException(raw"""user "$user" is not authorized""") } f(ui) - case None => throw new NotFoundException("no such app: " + appId) + } + } catch { + case _: NoSuchElementException => + val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + throw new NotFoundException(s"no such app: $appKey") } } } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index d4b980f5dbb07..b82e57fa453b8 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -17,13 +17,15 @@ package org.apache.spark.status +import java.lang.{Integer => JInteger, Long => JLong} + import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.spark.kvstore.KVIndex import org.apache.spark.status.api.v1._ import org.apache.spark.status.KVUtils._ -private[spark] class AppStatusStoreMetadata( +private[spark] case class AppStatusStoreMetadata( val version: Long) private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { @@ -38,6 +40,9 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { @JsonIgnore @KVIndex def id: String = info.id + @JsonIgnore @KVIndex("active") + def active: Boolean = info.isActive + @JsonIgnore @KVIndex("host") def host: String = info.hostPort.split(":")(0) @@ -65,13 +70,33 @@ private[spark] class StageDataWrapper( @JsonIgnore @KVIndex def id: Array[Int] = Array(info.stageId, info.attemptId) + @JsonIgnore @KVIndex("stageId") + def stageId: Int = info.stageId + } -private[spark] class TaskDataWrapper(val info: TaskData) { +/** + * The task information is always indexed with the stage ID, since that is how the UI and API + * consume it. That means every indexed value has the stage ID and attempt ID included, aside + * from the actual data being indexed. + */ +private[spark] class TaskDataWrapper( + val info: TaskData, + val stageId: Int, + val stageAttemptId: Int) { @JsonIgnore @KVIndex def id: Long = info.taskId + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + + @JsonIgnore @KVIndex("runtime") + def runtime: Array[AnyRef] = { + val _runtime = info.taskMetrics.map(_.executorRunTime).getOrElse(-1L) + Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong) + } + } private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index bf4cf79e9faa3..8d6ac55c05b32 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ +import org.apache.spark.status.AppStateStore import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo, UIRoot} import org.apache.spark.storage.StorageStatusListener @@ -39,6 +40,7 @@ import org.apache.spark.util.Utils * Top level user interface for a Spark application. */ private[spark] class SparkUI private ( + val store: AppStateStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, @@ -99,8 +101,12 @@ private[spark] class SparkUI private ( logInfo(s"Stopped Spark web UI at $webUrl") } - def getSparkUI(appId: String): Option[SparkUI] = { - if (appId == this.appId) Some(this) else None + override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T = { + if (appId == this.appId) { + fn(this) + } else { + throw new NoSuchElementException() + } } def getApplicationInfoList: Iterator[ApplicationInfo] = { @@ -152,60 +158,24 @@ private[spark] object SparkUI { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } - def createLiveUI( - sc: SparkContext, - conf: SparkConf, - listenerBus: SparkListenerBus, - jobProgressListener: JobProgressListener, - securityManager: SecurityManager, - appName: String, - startTime: Long): SparkUI = { - create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener), startTime = startTime) - } - - def createHistoryUI( - conf: SparkConf, - listenerBus: SparkListenerBus, - securityManager: SecurityManager, - appName: String, - basePath: String, - startTime: Long): SparkUI = { - val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) - - val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], - Utils.getContextOrSparkClassLoader).asScala - listenerFactories.foreach { listenerFactory => - val listeners = listenerFactory.createListeners(conf, sparkUI) - listeners.foreach(listenerBus.addListener) - } - sparkUI - } - /** - * Create a new Spark UI. - * - * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. - * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the - * web UI will create and register its own JobProgressListener. + * Create a new UI backed by an AppStateStore. */ - private def create( + def create( sc: Option[SparkContext], + store: AppStateStore, conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, - basePath: String = "", - jobProgressListener: Option[JobProgressListener] = None, + basePath: String, startTime: Long): SparkUI = { - val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val jobProgressListener = sc.map(_.jobProgressListener).getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } - val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) @@ -218,8 +188,9 @@ private[spark] object SparkUI { listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) - new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, + new SparkUI(store, sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 7998e3702c122..35f69d57ba609 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -18,15 +18,11 @@ package org.apache.spark.deploy.history import java.util.{Date, NoSuchElementException} -import javax.servlet.Filter import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.mutable -import scala.collection.mutable.ListBuffer import com.codahale.metrics.Counter -import com.google.common.cache.LoadingCache -import com.google.common.util.concurrent.UncheckedExecutionException import org.eclipse.jetty.servlet.ServletContextHandler import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -39,23 +35,10 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.ManualClock class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers { - /** - * subclass with access to the cache internals - * @param retainedApplications number of retained applications - */ - class TestApplicationCache( - operations: ApplicationCacheOperations = new StubCacheOperations(), - retainedApplications: Int, - clock: Clock = new ManualClock(0)) - extends ApplicationCache(operations, retainedApplications, clock) { - - def cache(): LoadingCache[CacheKey, CacheEntry] = appCache - } - /** * Stub cache operations. * The state is kept in a map of [[CacheKey]] to [[CacheEntry]], @@ -77,8 +60,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 - instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime))) + instances.get(CacheKey(appId, attemptId)).map { e => e.loadedUI } } override def attachSparkUI( @@ -96,10 +78,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp) - attachSparkUI(appId, attemptId, ui, completed) + ended: Long): LoadedAppUI = { + val ui = putAppUI(appId, attemptId, completed, started, ended) + attachSparkUI(appId, attemptId, ui.ui, completed) ui } @@ -108,23 +89,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attemptId: Option[String], completed: Boolean, started: Long, - ended: Long, - timestamp: Long): SparkUI = { - val ui = newUI(appId, attemptId, completed, started, ended) - putInstance(appId, attemptId, ui, completed, timestamp) + ended: Long): LoadedAppUI = { + val ui = LoadedAppUI(newUI(appId, attemptId, completed, started, ended)) + instances(CacheKey(appId, attemptId)) = new CacheEntry(ui, completed) ui } - def putInstance( - appId: String, - attemptId: Option[String], - ui: SparkUI, - completed: Boolean, - timestamp: Long): Unit = { - instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp)) - } - /** * Detach a reconstructed UI * @@ -146,23 +116,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar attached.get(CacheKey(appId, attemptId)) } - /** - * The update probe. - * @param appId application to probe - * @param attemptId attempt to probe - * @param updateTime timestamp of this UI load - */ - private[history] def updateProbe( - appId: String, - attemptId: Option[String], - updateTime: Long)(): Boolean = { - updateProbeCount += 1 - logDebug(s"isUpdated($appId, $attemptId, ${updateTime})") - val entry = instances.get(CacheKey(appId, attemptId)).get - val updated = entry.probeTime > updateTime - logDebug(s"entry = $entry; updated = $updated") - updated - } } /** @@ -210,15 +163,13 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val now = clock.getTimeMillis() // add the entry - operations.putAppUI(app1, None, true, now, now, now) + operations.putAppUI(app1, None, true, now, now) // make sure its local operations.getAppUI(app1, None).get operations.getAppUICount = 0 // now expect it to be found - val cacheEntry = cache.lookupCacheEntry(app1, None) - assert(1 === cacheEntry.probeTime) - assert(cacheEntry.completed) + cache.withSparkUI(app1, None) { _ => } // assert about queries made of the operations assert(1 === operations.getAppUICount, "getAppUICount") assert(1 === operations.attachCount, "attachCount") @@ -236,8 +187,8 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar assert(0 === operations.detachCount, "attachCount") // evict the entry - operations.putAndAttach("2", None, true, time2, time2, time2) - operations.putAndAttach("3", None, true, time2, time2, time2) + operations.putAndAttach("2", None, true, time2, time2) + operations.putAndAttach("3", None, true, time2, time2) cache.get("2") cache.get("3") @@ -248,7 +199,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val appId = "app1" val attemptId = Some("_01") val time3 = clock.getTimeMillis() - operations.putAppUI(appId, attemptId, false, time3, 0, time3) + operations.putAppUI(appId, attemptId, false, time3, 0) // expect an error here assertNotFound(appId, None) } @@ -256,10 +207,11 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Test that if an attempt ID is set, it must be used in lookups") { val operations = new StubCacheOperations() val clock = new ManualClock(1) - implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = 10, + clock = clock) val appId = "app1" val attemptId = Some("_01") - operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0) + operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0) assertNotFound(appId, None) } @@ -271,50 +223,29 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Incomplete apps refreshed") { val operations = new StubCacheOperations() val clock = new ManualClock(50) - val window = 500 - implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock) + implicit val cache = new ApplicationCache(operations, 5, clock) val metrics = cache.metrics // add the incomplete app // add the entry val started = clock.getTimeMillis() val appId = "app1" val attemptId = Some("001") - operations.putAppUI(appId, attemptId, false, started, 0, started) - val firstEntry = cache.lookupCacheEntry(appId, attemptId) - assert(started === firstEntry.probeTime, s"timestamp in $firstEntry") - assert(!firstEntry.completed, s"entry is complete: $firstEntry") - assertMetric("lookupCount", metrics.lookupCount, 1) + val initialUI = operations.putAndAttach(appId, attemptId, false, started, 0) + val firstUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assertMetric("lookupCount", metrics.lookupCount, 1) assert(0 === operations.updateProbeCount, "expected no update probe on that first get") - val checkTime = window * 2 - clock.setTime(checkTime) - val entry3 = cache.lookupCacheEntry(appId, attemptId) - assert(firstEntry !== entry3, s"updated entry test from $cache") + // Invalidate the first entry to trigger a re-load. + initialUI.invalidate() + + // Update the UI in the stub so that a new one is provided to the cache. + operations.putAppUI(appId, attemptId, true, started, started + 10) + + val updatedUI = cache.withSparkUI(appId, attemptId) { ui => ui } + assert(firstUI !== updatedUI, s"expected updated UI") assertMetric("lookupCount", metrics.lookupCount, 2) - assertMetric("updateProbeCount", metrics.updateProbeCount, 1) - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0) - assert(1 === operations.updateProbeCount, s"refresh count in $cache") - assert(0 === operations.detachCount, s"detach count") - assert(entry3.probeTime === checkTime) - - val updateTime = window * 3 - // update the cached value - val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime) - val endTime = window * 10 - clock.setTime(endTime) - logDebug(s"Before operation = $cache") - val entry5 = cache.lookupCacheEntry(appId, attemptId) - assertMetric("lookupCount", metrics.lookupCount, 3) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) - // the update was triggered - assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1) - assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache") - - // at which point, the refreshes stop - clock.setTime(window * 20) - assertCacheEntryEquals(appId, attemptId, entry5) - assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + assert(1 === operations.detachCount, s"detach count") } /** @@ -337,27 +268,6 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } } - /** - * Look up the cache entry and assert that it matches in the expected value. - * This assertion works if the two CacheEntries are different -it looks at the fields. - * UI are compared on object equality; the timestamp and completed flags directly. - * @param appId application ID - * @param attemptId attempt ID - * @param expected expected value - * @param cache app cache - */ - def assertCacheEntryEquals( - appId: String, - attemptId: Option[String], - expected: CacheEntry) - (implicit cache: ApplicationCache): Unit = { - val actual = cache.lookupCacheEntry(appId, attemptId) - val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache" - assert(expected.ui === actual.ui, errorText + " SparkUI reference") - assert(expected.completed === actual.completed, errorText + " -completed flag") - assert(expected.probeTime === actual.probeTime, errorText + " -timestamp") - } - /** * Assert that a key wasn't found in cache or loaded. * @@ -370,14 +280,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar appId: String, attemptId: Option[String]) (implicit cache: ApplicationCache): Unit = { - val ex = intercept[UncheckedExecutionException] { + val ex = intercept[NoSuchElementException] { cache.get(appId, attemptId) } - var cause = ex.getCause - assert(cause !== null) - if (!cause.isInstanceOf[NoSuchElementException]) { - throw cause - } } test("Large Scale Application Eviction") { @@ -385,12 +290,12 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar val clock = new ManualClock(0) val size = 5 // only two entries are retained, so we expect evictions to occur on lookups - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = size, clock = clock) + implicit val cache = new ApplicationCache(operations, retainedApplications = size, + clock = clock) val attempt1 = Some("01") - val ids = new ListBuffer[String]() + val ids = new mutable.ListBuffer[String]() // build a list of applications val count = 100 for (i <- 1 to count ) { @@ -398,7 +303,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar ids += appId clock.advance(10) val t = clock.getTimeMillis() - operations.putAppUI(appId, attempt1, true, t, t, t) + operations.putAppUI(appId, attempt1, true, t, t) } // now go through them in sequence reading them, expect evictions ids.foreach { id => @@ -413,20 +318,19 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar test("Attempts are Evicted") { val operations = new StubCacheOperations() - implicit val cache: ApplicationCache = new TestApplicationCache(operations, - retainedApplications = 4) + implicit val cache = new ApplicationCache(operations, 4, new ManualClock()) val metrics = cache.metrics val appId = "app1" val attempt1 = Some("01") val attempt2 = Some("02") val attempt3 = Some("03") - operations.putAppUI(appId, attempt1, true, 100, 110, 110) - operations.putAppUI(appId, attempt2, true, 200, 210, 210) - operations.putAppUI(appId, attempt3, true, 300, 310, 310) + operations.putAppUI(appId, attempt1, true, 100, 110) + operations.putAppUI(appId, attempt2, true, 200, 210) + operations.putAppUI(appId, attempt3, true, 300, 310) val attempt4 = Some("04") - operations.putAppUI(appId, attempt4, true, 400, 410, 410) + operations.putAppUI(appId, attempt4, true, 400, 410) val attempt5 = Some("05") - operations.putAppUI(appId, attempt5, true, 500, 510, 510) + operations.putAppUI(appId, attempt5, true, 500, 510) def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = { assertMetric("loadCount", metrics.loadCount, expectedLoad) @@ -457,20 +361,14 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar } - test("Instantiate Filter") { - // this is a regression test on the filter being constructable - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val instance = clazz.newInstance() - instance shouldBe a [Filter] - } - test("redirect includes query params") { - val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) - val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter] - filter.appId = "local-123" + val operations = new StubCacheOperations() + val ui = operations.putAndAttach("foo", None, true, 0, 10) val cache = mock[ApplicationCache] - when(cache.checkForUpdates(any(), any())).thenReturn(true) - ApplicationCacheCheckFilterRelay.setApplicationCache(cache) + when(cache.operations).thenReturn(operations) + val filter = new ApplicationCacheCheckFilter(new CacheKey("foo", None), ui, cache) + ui.invalidate() + val request = mock[HttpServletRequest] when(request.getMethod()).thenReturn("GET") when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 483bd648ddb87..d9fb13d8a499e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -598,6 +598,41 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("invalidate cached UI") { + val storeDir = Utils.createTempDir() + val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) + val provider = newProvider(conf) + val appId = "new1" + + // Write an incomplete app log. + val appLog = newLogFile(appId, None, inProgress = true) + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None) + ) + provider.checkForLogs() + + // Load the app UI. + val oldUI = provider.getAppUI(appId, None) + assert(oldUI.isDefined) + intercept[NoSuchElementException] { + oldUI.get.ui.store.job(0) + } + + // Add more info to the app log, and trigger the provider to update things. + writeFile(appLog, true, None, + SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None), + SparkListenerJobStart(0, 1L, Nil, null), + SparkListenerApplicationEnd(5L) + ) + provider.checkForLogs() + + // Load the UI again and make sure we can get the new info added to the logs. + val freshUI = provider.getAppUI(appId, None) + assert(freshUI.isDefined) + assert(freshUI != oldUI) + freshUI.get.ui.store.job(0) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 4277b82faa277..d2a318a680baa 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -72,6 +72,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers private var port: Int = -1 def init(extraConf: (String, String)*): Unit = { + Utils.deleteRecursively(storeDir) + assert(storeDir.mkdir()) val conf = new SparkConf() .set("spark.history.fs.logDirectory", logDir) .set("spark.history.fs.update.interval", "0") @@ -292,21 +294,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val uiRoot = "/testwebproxybase" System.setProperty("spark.ui.proxyBase", uiRoot) - server.stop() - - val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir) - .set("spark.history.fs.update.interval", "0") - .set("spark.testing", "true") - .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) - - provider = new FsHistoryProvider(conf) - provider.checkForLogs() - val securityManager = HistoryServer.createSecurityManager(conf) - - server = new HistoryServer(conf, provider, securityManager, 18080) - server.initialize() - server.bind() + stop() + init() val port = server.boundPort @@ -375,8 +364,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } test("incomplete apps get refreshed") { - server.stop() - implicit val webDriver: WebDriver = new HtmlUnitDriver implicit val formats = org.json4s.DefaultFormats @@ -386,6 +373,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // a new conf is used with the background thread set and running at its fastest // allowed refresh rate (1Hz) + stop() val myConf = new SparkConf() .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) @@ -418,7 +406,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers } } - server = new HistoryServer(myConf, provider, securityManager, 18080) + server = new HistoryServer(myConf, provider, securityManager, 0) server.initialize() server.bind() val port = server.boundPort @@ -464,7 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers rootAppPage should not be empty def getAppUI: SparkUI = { - provider.getAppUI(appId, None).get.ui + server.withSparkUI(appId, None) { ui => ui } } // selenium isn't that useful on failures...add our own reporting @@ -519,7 +507,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getNumJobs("") should be (1) getNumJobs("/jobs") should be (1) getNumJobsRestful() should be (1) - assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + assert(metrics.lookupCount.getCount > 0, s"lookup count too low in $metrics") // dump state before the next bit of test, which is where update // checking really gets stressed diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala index 8da3451f62b01..85054a3093f09 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.status import java.io.File +import java.lang.{Integer => JInteger, Long => JLong} import java.util.{Arrays, Date, Properties} import scala.collection.JavaConverters._ @@ -172,6 +173,14 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { s1Tasks.foreach { task => check[TaskDataWrapper](task.taskId) { wrapper => assert(wrapper.info.taskId === task.taskId) + assert(wrapper.stageId === stages.head.stageId) + assert(wrapper.stageAttemptId === stages.head.attemptId) + assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId))) + + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + -1L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) + assert(wrapper.info.index === task.index) assert(wrapper.info.attempt === task.attemptNumber) assert(wrapper.info.launchTime === new Date(task.launchTime)) @@ -253,6 +262,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { val s1Metrics = TaskMetrics.empty s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) time += 1 pending.foreach { task => @@ -274,9 +284,12 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { } pending.foreach { task => - check[TaskDataWrapper](task.taskId) { task => - assert(task.info.errorMessage === None) - assert(task.info.taskMetrics.get.executorCpuTime === 2L) + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.errorMessage === None) + assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L) + val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger, + 4L: JLong) + assert(Arrays.equals(wrapper.runtime, runtime)) } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dc4d5019b5cd6..feba90ac2c950 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,6 +38,7 @@ object MimaExcludes { lazy val v23excludes = v22excludes ++ Seq( // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable") )