From d8f3598d42755bfbd94c9fe30e3f1508c3c2c55b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 29 Mar 2017 11:18:23 -0700 Subject: [PATCH] SHS-NG M4.7: Remove JobProgressListener. The only remaining use of this class was the SparkStatusTracker, which was modified to use the new state store. The test code to wait for executors was moved to TestUtils and now uses the SparkStatusTracker API. As part of this change I also modified the streaming UI to read the needed data from the store, which was missed in the previous patch that made JobProgressListener redundant. --- .../scala/org/apache/spark/SparkContext.scala | 11 +- .../org/apache/spark/SparkStatusTracker.scala | 79 ++- .../scala/org/apache/spark/TestUtils.scala | 26 +- .../spark/status/api/v1/StagesResource.scala | 1 - .../apache/spark/ui/jobs/AllJobsPage.scala | 1 - .../spark/ui/jobs/JobProgressListener.scala | 599 ------------------ .../org/apache/spark/ui/jobs/StagePage.scala | 1 - .../org/apache/spark/ui/jobs/StageTable.scala | 1 - .../org/apache/spark/ui/jobs/UIData.scala | 295 --------- .../org/apache/spark/DistributedSuite.scala | 2 +- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../org/apache/spark/StatusTrackerSuite.scala | 6 +- .../spark/broadcast/BroadcastSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 4 +- .../SparkListenerWithClusterSuite.scala | 4 +- .../ui/jobs/JobProgressListenerSuite.scala | 442 ------------- .../apache/spark/streaming/ui/BatchPage.scala | 75 ++- 17 files changed, 113 insertions(+), 1438 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala delete mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala delete mode 100644 core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 527b45ea102d1..b8fff8b14c898 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,7 +58,6 @@ import org.apache.spark.status.AppStateStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} -import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ /** @@ -197,7 +196,6 @@ class SparkContext(config: SparkConf) extends Logging { private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ - private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None private var _ui: Option[SparkUI] = None @@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging { val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala } - private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener - def statusTracker: SparkStatusTracker = _statusTracker private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar @@ -425,11 +421,6 @@ class SparkContext(config: SparkConf) extends Logging { if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") - // "_jobProgressListener" should be set up before creating SparkEnv because when creating - // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. - _jobProgressListener = new JobProgressListener(_conf) - listenerBus.addListener(jobProgressListener) - // Initialize the app state store and listener before SparkEnv is created so that it gets // all events. _stateStore = AppStateStore.createTempStore(conf, listenerBus) @@ -444,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging { _conf.set("spark.repl.class.uri", replUri) } - _statusTracker = new SparkStatusTracker(this) + _statusTracker = new SparkStatusTracker(this, _stateStore) _progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 22a553e68439a..72d96bedd50f5 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -17,7 +17,11 @@ package org.apache.spark +import java.util.Arrays + import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.status.AppStateStore +import org.apache.spark.status.api.v1.StageStatus /** * Low-level status reporting APIs for monitoring job and stage progress. @@ -33,9 +37,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl * * NOTE: this class's constructor should be considered private and may be subject to change. */ -class SparkStatusTracker private[spark] (sc: SparkContext) { - - private val jobProgressListener = sc.jobProgressListener +class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStateStore) { /** * Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then @@ -46,9 +48,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * its result. */ def getJobIdsForGroup(jobGroup: String): Array[Int] = { - jobProgressListener.synchronized { - jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray - } + val expected = Option(jobGroup) + store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray } /** @@ -57,9 +58,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * This method does not guarantee the order of the elements in its result. */ def getActiveStageIds(): Array[Int] = { - jobProgressListener.synchronized { - jobProgressListener.activeStages.values.map(_.stageId).toArray - } + store.stageList(Arrays.asList(StageStatus.ACTIVE)).map(_.stageId).toArray } /** @@ -68,19 +67,18 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * This method does not guarantee the order of the elements in its result. */ def getActiveJobIds(): Array[Int] = { - jobProgressListener.synchronized { - jobProgressListener.activeJobs.values.map(_.jobId).toArray - } + store.jobsList(Arrays.asList(JobExecutionStatus.RUNNING)).map(_.jobId).toArray } /** * Returns job information, or `None` if the job info could not be found or was garbage collected. */ def getJobInfo(jobId: Int): Option[SparkJobInfo] = { - jobProgressListener.synchronized { - jobProgressListener.jobIdToData.get(jobId).map { data => - new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) - } + try { + val job = store.job(jobId) + Some(new SparkJobInfoImpl(jobId, job.stageIds.toArray, job.status)) + } catch { + case _: NoSuchElementException => None } } @@ -89,21 +87,19 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * garbage collected. */ def getStageInfo(stageId: Int): Option[SparkStageInfo] = { - jobProgressListener.synchronized { - for ( - info <- jobProgressListener.stageIdToInfo.get(stageId); - data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) - ) yield { - new SparkStageInfoImpl( - stageId, - info.attemptId, - info.submissionTime.getOrElse(0), - info.name, - info.numTasks, - data.numActiveTasks, - data.numCompleteTasks, - data.numFailedTasks) - } + try { + val info = store.lastStageAttempt(stageId) + Some(new SparkStageInfoImpl( + stageId, + info.attemptId, + info.submissionTime.map(_.getTime()).getOrElse(0L), + info.name, + info.numTasks, + info.numActiveTasks, + info.numCompleteTasks, + info.numFailedTasks)) + } catch { + case _: NoSuchElementException => None } } @@ -111,17 +107,16 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. */ def getExecutorInfos: Array[SparkExecutorInfo] = { - val executorIdToRunningTasks: Map[String, Int] = - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors - - sc.getExecutorStorageStatus.map { status => - val bmId = status.blockManagerId + store.executorList(true).map { exec => + val (host, port) = exec.hostPort.split(":", 2) match { + case Array(h, p) => (h, p.toInt) + case Array(h) => (h, -1) + } new SparkExecutorInfoImpl( - bmId.host, - bmId.port, - status.cacheSize, - executorIdToRunningTasks.getOrElse(bmId.executorId, 0) - ) - } + host, + port, + exec.maxMemory, + exec.activeTasks) + }.toArray } } diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 3f912dc191515..531ea5cdd82e8 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.security.SecureRandom import java.security.cert.X509Certificate import java.util.Arrays -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl._ import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} @@ -232,6 +232,30 @@ private[spark] object TestUtils { } } + /** + * Wait until at least `numExecutors` executors are up, or throw `TimeoutException` if the waiting + * time elapsed before `numExecutors` executors up. Exposed for testing. + * + * @param numExecutors the number of executors to wait at least + * @param timeout time to wait in milliseconds + */ + private[spark] def waitUntilExecutorsUp( + sc: SparkContext, + numExecutors: Int, + timeout: Long): Unit = { + val finishTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) + while (System.nanoTime() < finishTime) { + if (sc.statusTracker.getExecutorInfos.length > numExecutors) { + return + } + // Sleep rather than using wait/notify, because this is used only for testing and wait/notify + // add overhead in the general case. + Thread.sleep(10) + } + throw new TimeoutException( + s"Can't find $numExecutors executors before $timeout milliseconds elapsed") + } + } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala index 56b6782ff4cd1..64968c321f0a0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala @@ -25,7 +25,6 @@ import org.apache.spark.scheduler.StageInfo import org.apache.spark.status.api.v1.StageStatus._ import org.apache.spark.status.api.v1.TaskSorting._ import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData.StageUIData @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class StagesResource extends BaseAppResource { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index dae9680a94322..d2175f3a2308e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.status.AppStateStore import org.apache.spark.status.api.v1 import org.apache.spark.ui._ -import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished jobs */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala deleted file mode 100644 index 8870187f2219c..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ /dev/null @@ -1,599 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import java.util.concurrent.TimeoutException - -import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer} - -import org.apache.spark._ -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.storage.BlockManagerId -import org.apache.spark.ui.SparkUI -import org.apache.spark.ui.jobs.UIData._ - -/** - * :: DeveloperApi :: - * Tracks task-level information to be displayed in the UI. - * - * All access to the data structures in this class must be synchronized on the - * class, since the UI thread and the EventBus loop may otherwise be reading and - * updating the internal data structures concurrently. - */ -@DeveloperApi -@deprecated("This class will be removed in a future release.", "2.2.0") -class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { - - // Define a handful of type aliases so that data structures' types can serve as documentation. - // These type aliases are public because they're used in the types of public fields: - - type JobId = Int - type JobGroupId = String - type StageId = Int - type StageAttemptId = Int - type PoolName = String - type ExecutorId = String - - // Application: - @volatile var startTime = -1L - @volatile var endTime = -1L - - // Jobs: - val activeJobs = new HashMap[JobId, JobUIData] - val completedJobs = ListBuffer[JobUIData]() - val failedJobs = ListBuffer[JobUIData]() - val jobIdToData = new HashMap[JobId, JobUIData] - val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]] - - // Stages: - val pendingStages = new HashMap[StageId, StageInfo] - val activeStages = new HashMap[StageId, StageInfo] - val completedStages = ListBuffer[StageInfo]() - val skippedStages = ListBuffer[StageInfo]() - val failedStages = ListBuffer[StageInfo]() - val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] - val stageIdToInfo = new HashMap[StageId, StageInfo] - val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] - val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() - // Total of completed and failed stages that have ever been run. These may be greater than - // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than - // JobProgressListener's retention limits. - var numCompletedStages = 0 - var numFailedStages = 0 - var numCompletedJobs = 0 - var numFailedJobs = 0 - - // Misc: - val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() - - def blockManagerIds: Seq[BlockManagerId] = executorIdToBlockManagerId.values.toSeq - - var schedulingMode: Option[SchedulingMode] = None - - // To limit the total memory usage of JobProgressListener, we only track information for a fixed - // number of non-active jobs and stages (there is no limit for active jobs and stages): - - val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) - val retainedTasks = conf.get(UI_RETAINED_TASKS) - - // We can test for memory leaks by ensuring that collections that track non-active jobs and - // stages do not grow without bound and that collections for active jobs/stages eventually become - // empty once Spark is idle. Let's partition our collections into ones that should be empty - // once Spark is idle and ones that should have a hard- or soft-limited sizes. - // These methods are used by unit tests, but they're defined here so that people don't forget to - // update the tests when adding new collections. Some collections have multiple levels of - // nesting, etc, so this lets us customize our notion of "size" for each structure: - - // These collections should all be empty once Spark is idle (no active stages / jobs): - private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = { - Map( - "activeStages" -> activeStages.size, - "activeJobs" -> activeJobs.size, - "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum, - "stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum - ) - } - - // These collections should stop growing once we have run at least `spark.ui.retainedStages` - // stages and `spark.ui.retainedJobs` jobs: - private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = { - Map( - "completedJobs" -> completedJobs.size, - "failedJobs" -> failedJobs.size, - "completedStages" -> completedStages.size, - "skippedStages" -> skippedStages.size, - "failedStages" -> failedStages.size - ) - } - - // These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to - // some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings: - private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = { - Map( - "jobIdToData" -> jobIdToData.size, - "stageIdToData" -> stageIdToData.size, - "stageIdToStageInfo" -> stageIdToInfo.size, - "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum, - // Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values: - "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size - ) - } - - /** If stages is too large, remove and garbage collect old stages */ - private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { - if (stages.size > retainedStages) { - val toRemove = calculateNumberToRemove(stages.size, retainedStages) - stages.take(toRemove).foreach { s => - stageIdToData.remove((s.stageId, s.attemptId)) - stageIdToInfo.remove(s.stageId) - } - stages.trimStart(toRemove) - } - } - - /** If jobs is too large, remove and garbage collect old jobs */ - private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { - if (jobs.size > retainedJobs) { - val toRemove = calculateNumberToRemove(jobs.size, retainedJobs) - jobs.take(toRemove).foreach { job => - // Remove the job's UI data, if it exists - jobIdToData.remove(job.jobId).foreach { removedJob => - // A null jobGroupId is used for jobs that are run without a job group - val jobGroupId = removedJob.jobGroup.orNull - // Remove the job group -> job mapping entry, if it exists - jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup => - jobsInGroup.remove(job.jobId) - // If this was the last job in this job group, remove the map entry for the job group - if (jobsInGroup.isEmpty) { - jobGroupToJobIds.remove(jobGroupId) - } - } - } - } - jobs.trimStart(toRemove) - } - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { - val jobGroup = for ( - props <- Option(jobStart.properties); - group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) - ) yield group - val jobData: JobUIData = - new JobUIData( - jobId = jobStart.jobId, - submissionTime = Option(jobStart.time).filter(_ >= 0), - stageIds = jobStart.stageIds, - jobGroup = jobGroup, - status = JobExecutionStatus.RUNNING) - // A null jobGroupId is used for jobs that are run without a job group - jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId) - jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) - // Compute (a potential underestimate of) the number of tasks that will be run by this job. - // This may be an underestimate because the job start event references all of the result - // stages' transitive stage dependencies, but some of these stages might be skipped if their - // output is available from earlier runs. - // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. - jobData.numTasks = { - val allStages = jobStart.stageInfos - val missingStages = allStages.filter(_.completionTime.isEmpty) - missingStages.map(_.numTasks).sum - } - jobIdToData(jobStart.jobId) = jobData - activeJobs(jobStart.jobId) = jobData - for (stageId <- jobStart.stageIds) { - stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId) - } - // If there's no information for a stage, store the StageInfo received from the scheduler - // so that we can display stage descriptions for pending stages: - for (stageInfo <- jobStart.stageInfos) { - stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo) - stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData) - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { - val jobData = activeJobs.remove(jobEnd.jobId).getOrElse { - logWarning(s"Job completed for unknown job ${jobEnd.jobId}") - new JobUIData(jobId = jobEnd.jobId) - } - jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) - - jobData.stageIds.foreach(pendingStages.remove) - jobEnd.jobResult match { - case JobSucceeded => - completedJobs += jobData - trimJobsIfNecessary(completedJobs) - jobData.status = JobExecutionStatus.SUCCEEDED - numCompletedJobs += 1 - case JobFailed(_) => - failedJobs += jobData - trimJobsIfNecessary(failedJobs) - jobData.status = JobExecutionStatus.FAILED - numFailedJobs += 1 - } - for (stageId <- jobData.stageIds) { - stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => - jobsUsingStage.remove(jobEnd.jobId) - if (jobsUsingStage.isEmpty) { - stageIdToActiveJobIds.remove(stageId) - } - stageIdToInfo.get(stageId).foreach { stageInfo => - if (stageInfo.submissionTime.isEmpty) { - // if this stage is pending, it won't complete, so mark it as "skipped": - skippedStages += stageInfo - trimStagesIfNecessary(skippedStages) - jobData.numSkippedStages += 1 - jobData.numSkippedTasks += stageInfo.numTasks - } - } - } - } - } - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { - val stage = stageCompleted.stageInfo - stageIdToInfo(stage.stageId) = stage - val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { - logWarning("Stage completed for unknown stage " + stage.stageId) - new StageUIData - }) - - for ((id, info) <- stageCompleted.stageInfo.accumulables) { - stageData.accumulables(id) = info - } - - poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap => - hashMap.remove(stage.stageId) - } - activeStages.remove(stage.stageId) - if (stage.failureReason.isEmpty) { - completedStages += stage - numCompletedStages += 1 - trimStagesIfNecessary(completedStages) - } else { - failedStages += stage - numFailedStages += 1 - trimStagesIfNecessary(failedStages) - } - - for ( - activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); - jobId <- activeJobsDependentOnStage; - jobData <- jobIdToData.get(jobId) - ) { - jobData.numActiveStages -= 1 - if (stage.failureReason.isEmpty) { - if (stage.submissionTime.isDefined) { - jobData.completedStageIndices.add(stage.stageId) - } - } else { - jobData.numFailedStages += 1 - } - } - } - - /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stage = stageSubmitted.stageInfo - activeStages(stage.stageId) = stage - pendingStages.remove(stage.stageId) - val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME) - }.getOrElse(SparkUI.DEFAULT_POOL_NAME) - - stageIdToInfo(stage.stageId) = stage - val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) - stageData.schedulingPool = poolName - - stageData.description = Option(stageSubmitted.properties).flatMap { - p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) - } - - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) - stages(stage.stageId) = stage - - for ( - activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId); - jobId <- activeJobsDependentOnStage; - jobData <- jobIdToData.get(jobId) - ) { - jobData.numActiveStages += 1 - - // If a stage retries again, it should be removed from completedStageIndices set - jobData.completedStageIndices.remove(stage.stageId) - } - } - - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { - val taskInfo = taskStart.taskInfo - if (taskInfo != null) { - val metrics = TaskMetrics.empty - val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { - logWarning("Task start for unknown stage " + taskStart.stageId) - new StageUIData - }) - stageData.numActiveTasks += 1 - stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics))) - } - for ( - activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId); - jobId <- activeJobsDependentOnStage; - jobData <- jobIdToData.get(jobId) - ) { - jobData.numActiveTasks += 1 - } - } - - override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { - // Do nothing: because we don't do a deep copy of the TaskInfo, the TaskInfo in - // stageToTaskInfos already has the updated status. - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { - val info = taskEnd.taskInfo - // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task - // completion event is for. Let's just drop it here. This means we might have some speculation - // tasks on the web ui that's never marked as complete. - if (info != null && taskEnd.stageAttemptId != -1) { - val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), { - logWarning("Task end for unknown stage " + taskEnd.stageId) - new StageUIData - }) - - for (accumulableInfo <- info.accumulables) { - stageData.accumulables(accumulableInfo.id) = accumulableInfo - } - - val execSummaryMap = stageData.executorSummary - val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary) - - taskEnd.reason match { - case Success => - execSummary.succeededTasks += 1 - case kill: TaskKilled => - execSummary.reasonToNumKilled = execSummary.reasonToNumKilled.updated( - kill.reason, execSummary.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) - case _ => - execSummary.failedTasks += 1 - } - execSummary.taskTime += info.duration - stageData.numActiveTasks -= 1 - - val errorMessage: Option[String] = - taskEnd.reason match { - case org.apache.spark.Success => - stageData.completedIndices.add(info.index) - stageData.numCompleteTasks += 1 - None - case kill: TaskKilled => - stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated( - kill.reason, stageData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) - Some(kill.toErrorString) - case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates - stageData.numFailedTasks += 1 - Some(e.toErrorString) - case e: TaskFailedReason => // All other failure cases - stageData.numFailedTasks += 1 - Some(e.toErrorString) - } - - val taskMetrics = Option(taskEnd.taskMetrics) - taskMetrics.foreach { m => - val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics) - updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) - } - - val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None)) - taskData.updateTaskInfo(info) - taskData.updateTaskMetrics(taskMetrics) - taskData.errorMessage = errorMessage - - // If Tasks is too large, remove and garbage collect old tasks - if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop( - calculateNumberToRemove(stageData.taskData.size, retainedTasks)) - } - - for ( - activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId); - jobId <- activeJobsDependentOnStage; - jobData <- jobIdToData.get(jobId) - ) { - jobData.numActiveTasks -= 1 - taskEnd.reason match { - case Success => - jobData.numCompletedTasks += 1 - case kill: TaskKilled => - jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated( - kill.reason, jobData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) - case _ => - jobData.numFailedTasks += 1 - } - } - } - } - - /** - * Remove at least (maxRetained / 10) items to reduce friction. - */ - private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = { - math.max(retainedSize / 10, dataSize - retainedSize) - } - - /** - * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage - * aggregate metrics by calculating deltas between the currently recorded metrics and the new - * metrics. - */ - def updateAggregateMetrics( - stageData: StageUIData, - execId: String, - taskMetrics: TaskMetrics, - oldMetrics: Option[TaskMetricsUIData]) { - val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary) - - val shuffleWriteDelta = - taskMetrics.shuffleWriteMetrics.bytesWritten - - oldMetrics.map(_.shuffleWriteMetrics.bytesWritten).getOrElse(0L) - stageData.shuffleWriteBytes += shuffleWriteDelta - execSummary.shuffleWrite += shuffleWriteDelta - - val shuffleWriteRecordsDelta = - taskMetrics.shuffleWriteMetrics.recordsWritten - - oldMetrics.map(_.shuffleWriteMetrics.recordsWritten).getOrElse(0L) - stageData.shuffleWriteRecords += shuffleWriteRecordsDelta - execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta - - val shuffleReadDelta = - taskMetrics.shuffleReadMetrics.totalBytesRead - - oldMetrics.map(_.shuffleReadMetrics.totalBytesRead).getOrElse(0L) - stageData.shuffleReadTotalBytes += shuffleReadDelta - execSummary.shuffleRead += shuffleReadDelta - - val shuffleReadRecordsDelta = - taskMetrics.shuffleReadMetrics.recordsRead - - oldMetrics.map(_.shuffleReadMetrics.recordsRead).getOrElse(0L) - stageData.shuffleReadRecords += shuffleReadRecordsDelta - execSummary.shuffleReadRecords += shuffleReadRecordsDelta - - val inputBytesDelta = - taskMetrics.inputMetrics.bytesRead - - oldMetrics.map(_.inputMetrics.bytesRead).getOrElse(0L) - stageData.inputBytes += inputBytesDelta - execSummary.inputBytes += inputBytesDelta - - val inputRecordsDelta = - taskMetrics.inputMetrics.recordsRead - - oldMetrics.map(_.inputMetrics.recordsRead).getOrElse(0L) - stageData.inputRecords += inputRecordsDelta - execSummary.inputRecords += inputRecordsDelta - - val outputBytesDelta = - taskMetrics.outputMetrics.bytesWritten - - oldMetrics.map(_.outputMetrics.bytesWritten).getOrElse(0L) - stageData.outputBytes += outputBytesDelta - execSummary.outputBytes += outputBytesDelta - - val outputRecordsDelta = - taskMetrics.outputMetrics.recordsWritten - - oldMetrics.map(_.outputMetrics.recordsWritten).getOrElse(0L) - stageData.outputRecords += outputRecordsDelta - execSummary.outputRecords += outputRecordsDelta - - val diskSpillDelta = - taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L) - stageData.diskBytesSpilled += diskSpillDelta - execSummary.diskBytesSpilled += diskSpillDelta - - val memorySpillDelta = - taskMetrics.memoryBytesSpilled - oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L) - stageData.memoryBytesSpilled += memorySpillDelta - execSummary.memoryBytesSpilled += memorySpillDelta - - val timeDelta = - taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L) - stageData.executorRunTime += timeDelta - - val cpuTimeDelta = - taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L) - stageData.executorCpuTime += cpuTimeDelta - } - - override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { - for ((taskId, sid, sAttempt, accumUpdates) <- executorMetricsUpdate.accumUpdates) { - val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { - logWarning("Metrics update for task in unknown stage " + sid) - new StageUIData - }) - val taskData = stageData.taskData.get(taskId) - val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) - taskData.foreach { t => - if (!t.taskInfo.finished) { - updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics) - // Overwrite task metrics - t.updateTaskMetrics(Some(metrics)) - } - } - } - } - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - schedulingMode = environmentUpdate - .environmentDetails("Spark Properties").toMap - .get("spark.scheduler.mode") - .map(SchedulingMode.withName) - } - } - - override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { - synchronized { - val blockManagerId = blockManagerAdded.blockManagerId - val executorId = blockManagerId.executorId - executorIdToBlockManagerId(executorId) = blockManagerId - } - } - - override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { - synchronized { - val executorId = blockManagerRemoved.blockManagerId.executorId - executorIdToBlockManagerId.remove(executorId) - } - } - - override def onApplicationStart(appStarted: SparkListenerApplicationStart) { - startTime = appStarted.time - } - - override def onApplicationEnd(appEnded: SparkListenerApplicationEnd) { - endTime = appEnded.time - } - - /** - * For testing only. Wait until at least `numExecutors` executors are up, or throw - * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up. - * Exposed for testing. - * - * @param numExecutors the number of executors to wait at least - * @param timeout time to wait in milliseconds - */ - private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = { - val finishTime = System.currentTimeMillis() + timeout - while (System.currentTimeMillis() < finishTime) { - val numBlockManagers = synchronized { - blockManagerIds.size - } - if (numBlockManagers >= numExecutors + 1) { - // Need to count the block manager in driver - return - } - // Sleep rather than using wait/notify, because this is used only for testing and wait/notify - // add overhead in the general case. - Thread.sleep(10) - } - throw new TimeoutException( - s"Can't find $numExecutors executors before $timeout milliseconds elapsed") - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 9ea618e7e3086..87974ab7c81ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -33,7 +33,6 @@ import org.apache.spark.status.AppStateStore import org.apache.spark.status.api.v1._ import org.apache.spark.ui._ import org.apache.spark.ui.jobs.ApiHelper._ -import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Distribution, Utils} /** Page showing statistics and task list for a given stage */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 1bbe15755969a..6b84bf980b193 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -31,7 +31,6 @@ import org.apache.spark.status.AppStateStore import org.apache.spark.status.api.v1 import org.apache.spark.storage.RDDInfo import org.apache.spark.ui._ -import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils private[ui] class StageTableBase( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala deleted file mode 100644 index ac1a74ad8029d..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import scala.collection.mutable -import scala.collection.mutable.{HashMap, LinkedHashMap} - -import org.apache.spark.JobExecutionStatus -import org.apache.spark.executor._ -import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} -import org.apache.spark.util.AccumulatorContext -import org.apache.spark.util.collection.OpenHashSet - -private[spark] object UIData { - - class ExecutorSummary { - var taskTime : Long = 0 - var failedTasks : Int = 0 - var succeededTasks : Int = 0 - var reasonToNumKilled : Map[String, Int] = Map.empty - var inputBytes : Long = 0 - var inputRecords : Long = 0 - var outputBytes : Long = 0 - var outputRecords : Long = 0 - var shuffleRead : Long = 0 - var shuffleReadRecords : Long = 0 - var shuffleWrite : Long = 0 - var shuffleWriteRecords : Long = 0 - var memoryBytesSpilled : Long = 0 - var diskBytesSpilled : Long = 0 - var isBlacklisted : Int = 0 - } - - class JobUIData( - var jobId: Int = -1, - var submissionTime: Option[Long] = None, - var completionTime: Option[Long] = None, - var stageIds: Seq[Int] = Seq.empty, - var jobGroup: Option[String] = None, - var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN, - /* Tasks */ - // `numTasks` is a potential underestimate of the true number of tasks that this job will run. - // This may be an underestimate because the job start event references all of the result - // stages' transitive stage dependencies, but some of these stages might be skipped if their - // output is available from earlier runs. - // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. - var numTasks: Int = 0, - var numActiveTasks: Int = 0, - var numCompletedTasks: Int = 0, - var numSkippedTasks: Int = 0, - var numFailedTasks: Int = 0, - var reasonToNumKilled: Map[String, Int] = Map.empty, - /* Stages */ - var numActiveStages: Int = 0, - // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: - var completedStageIndices: mutable.HashSet[Int] = new mutable.HashSet[Int](), - var numSkippedStages: Int = 0, - var numFailedStages: Int = 0 - ) - - class StageUIData { - var numActiveTasks: Int = _ - var numCompleteTasks: Int = _ - var completedIndices = new OpenHashSet[Int]() - var numFailedTasks: Int = _ - var reasonToNumKilled: Map[String, Int] = Map.empty - - var executorRunTime: Long = _ - var executorCpuTime: Long = _ - - var inputBytes: Long = _ - var inputRecords: Long = _ - var outputBytes: Long = _ - var outputRecords: Long = _ - var shuffleReadTotalBytes: Long = _ - var shuffleReadRecords : Long = _ - var shuffleWriteBytes: Long = _ - var shuffleWriteRecords: Long = _ - var memoryBytesSpilled: Long = _ - var diskBytesSpilled: Long = _ - var isBlacklisted: Int = _ - - var schedulingPool: String = "" - var description: Option[String] = None - - var accumulables = new HashMap[Long, AccumulableInfo] - var taskData = new LinkedHashMap[Long, TaskUIData] - var executorSummary = new HashMap[String, ExecutorSummary] - - def hasInput: Boolean = inputBytes > 0 - def hasOutput: Boolean = outputBytes > 0 - def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0 - def hasShuffleWrite: Boolean = shuffleWriteBytes > 0 - def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 && diskBytesSpilled > 0 - } - - /** - * These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation. - */ - class TaskUIData private( - private var _taskInfo: TaskInfo, - private var _metrics: Option[TaskMetricsUIData]) { - - var errorMessage: Option[String] = None - - def taskInfo: TaskInfo = _taskInfo - - def metrics: Option[TaskMetricsUIData] = _metrics - - def updateTaskInfo(taskInfo: TaskInfo): Unit = { - _taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) - } - - def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = { - _metrics = TaskUIData.toTaskMetricsUIData(metrics) - } - - def taskDuration: Option[Long] = { - if (taskInfo.status == "RUNNING") { - Some(_taskInfo.timeRunning(System.currentTimeMillis)) - } else { - _metrics.map(_.executorRunTime) - } - } - } - - object TaskUIData { - def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = { - new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics)) - } - - private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = { - metrics.map { m => - TaskMetricsUIData( - executorDeserializeTime = m.executorDeserializeTime, - executorDeserializeCpuTime = m.executorDeserializeCpuTime, - executorRunTime = m.executorRunTime, - executorCpuTime = m.executorCpuTime, - resultSize = m.resultSize, - jvmGCTime = m.jvmGCTime, - resultSerializationTime = m.resultSerializationTime, - memoryBytesSpilled = m.memoryBytesSpilled, - diskBytesSpilled = m.diskBytesSpilled, - peakExecutionMemory = m.peakExecutionMemory, - inputMetrics = InputMetricsUIData(m.inputMetrics), - outputMetrics = OutputMetricsUIData(m.outputMetrics), - shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics), - shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics)) - } - } - - /** - * We don't need to store internal or SQL accumulables as their values will be shown in other - * places, so drop them to reduce the memory usage. - */ - private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = { - val newTaskInfo = new TaskInfo( - taskId = taskInfo.taskId, - index = taskInfo.index, - attemptNumber = taskInfo.attemptNumber, - launchTime = taskInfo.launchTime, - executorId = taskInfo.executorId, - host = taskInfo.host, - taskLocality = taskInfo.taskLocality, - speculative = taskInfo.speculative - ) - newTaskInfo.gettingResultTime = taskInfo.gettingResultTime - newTaskInfo.setAccumulables(taskInfo.accumulables.filter { - accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) - }) - newTaskInfo.finishTime = taskInfo.finishTime - newTaskInfo.failed = taskInfo.failed - newTaskInfo.killed = taskInfo.killed - newTaskInfo - } - } - - case class TaskMetricsUIData( - executorDeserializeTime: Long, - executorDeserializeCpuTime: Long, - executorRunTime: Long, - executorCpuTime: Long, - resultSize: Long, - jvmGCTime: Long, - resultSerializationTime: Long, - memoryBytesSpilled: Long, - diskBytesSpilled: Long, - peakExecutionMemory: Long, - inputMetrics: InputMetricsUIData, - outputMetrics: OutputMetricsUIData, - shuffleReadMetrics: ShuffleReadMetricsUIData, - shuffleWriteMetrics: ShuffleWriteMetricsUIData) - - case class InputMetricsUIData(bytesRead: Long, recordsRead: Long) - object InputMetricsUIData { - def apply(metrics: InputMetrics): InputMetricsUIData = { - if (metrics.bytesRead == 0 && metrics.recordsRead == 0) { - EMPTY - } else { - new InputMetricsUIData( - bytesRead = metrics.bytesRead, - recordsRead = metrics.recordsRead) - } - } - private val EMPTY = InputMetricsUIData(0, 0) - } - - case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long) - object OutputMetricsUIData { - def apply(metrics: OutputMetrics): OutputMetricsUIData = { - if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) { - EMPTY - } else { - new OutputMetricsUIData( - bytesWritten = metrics.bytesWritten, - recordsWritten = metrics.recordsWritten) - } - } - private val EMPTY = OutputMetricsUIData(0, 0) - } - - case class ShuffleReadMetricsUIData( - remoteBlocksFetched: Long, - localBlocksFetched: Long, - remoteBytesRead: Long, - localBytesRead: Long, - fetchWaitTime: Long, - recordsRead: Long, - totalBytesRead: Long, - totalBlocksFetched: Long) - - object ShuffleReadMetricsUIData { - def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = { - if ( - metrics.remoteBlocksFetched == 0 && - metrics.localBlocksFetched == 0 && - metrics.remoteBytesRead == 0 && - metrics.localBytesRead == 0 && - metrics.fetchWaitTime == 0 && - metrics.recordsRead == 0 && - metrics.totalBytesRead == 0 && - metrics.totalBlocksFetched == 0) { - EMPTY - } else { - new ShuffleReadMetricsUIData( - remoteBlocksFetched = metrics.remoteBlocksFetched, - localBlocksFetched = metrics.localBlocksFetched, - remoteBytesRead = metrics.remoteBytesRead, - localBytesRead = metrics.localBytesRead, - fetchWaitTime = metrics.fetchWaitTime, - recordsRead = metrics.recordsRead, - totalBytesRead = metrics.totalBytesRead, - totalBlocksFetched = metrics.totalBlocksFetched - ) - } - } - private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0) - } - - case class ShuffleWriteMetricsUIData( - bytesWritten: Long, - recordsWritten: Long, - writeTime: Long) - - object ShuffleWriteMetricsUIData { - def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = { - if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) { - EMPTY - } else { - new ShuffleWriteMetricsUIData( - bytesWritten = metrics.bytesWritten, - recordsWritten = metrics.recordsWritten, - writeTime = metrics.writeTime - ) - } - } - private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0) - } - -} diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 84f7f1fc8eb09..92665b71e55d5 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -153,7 +153,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = { sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test")) - sc.jobProgressListener.waitUntilExecutorsUp(2, 30000) + TestUtils.waitUntilExecutorsUp(sc, 2, 30000) val data = sc.parallelize(1 to 1000, 10) val cachedData = data.persist(storageLevel) assert(cachedData.count === 1000) diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index fe944031bc948..472952addf353 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -66,7 +66,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { // local blocks from the local BlockManager and won't send requests to ExternalShuffleService. // In this case, we won't receive FetchFailed. And it will make this test fail. // Therefore, we should wait until all slaves are up - sc.jobProgressListener.waitUntilExecutorsUp(2, 60000) + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index 5483f2b8434aa..a15ae040d43a9 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -44,13 +44,13 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont stageIds.size should be(2) val firstStageInfo = eventually(timeout(10 seconds)) { - sc.statusTracker.getStageInfo(stageIds(0)).get + sc.statusTracker.getStageInfo(stageIds.min).get } - firstStageInfo.stageId() should be(stageIds(0)) + firstStageInfo.stageId() should be(stageIds.min) firstStageInfo.currentAttemptId() should be(0) firstStageInfo.numTasks() should be(2) eventually(timeout(10 seconds)) { - val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get + val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds.min).get updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numActiveTasks() should be(0) updatedFirstStageInfo.numFailedTasks() should be(0) diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 46f9ac6b0273a..159629825c677 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -224,7 +224,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test") // Wait until all salves are up try { - _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 60000) + TestUtils.waitUntilExecutorsUp(_sc, numSlaves, 60000) _sc } catch { case e: Throwable => diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..13fa7abac524d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2304,13 +2304,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // OutputCommitCoordinator requires the task info itself to not be null. private def createFakeTaskInfo(): TaskInfo = { val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false) - info.finishTime = 1 // to prevent spurious errors in JobProgressListener + info.finishTime = 1 info } private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = { val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false) - info.finishTime = 1 // to prevent spurious errors in JobProgressListener + info.finishTime = 1 info } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 9fa8859382911..123f7f49d21b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo /** @@ -43,7 +43,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext // This test will check if the number of executors received by "SparkListener" is same as the // number of all executors, so we need to wait until all executors are up - sc.jobProgressListener.waitUntilExecutorsUp(2, 60000) + TestUtils.waitUntilExecutorsUp(sc, 2, 60000) val rdd1 = sc.parallelize(1 to 100, 4) val rdd2 = rdd1.map(_.toString) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala deleted file mode 100644 index 93964a2d56743..0000000000000 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import java.util.Properties - -import org.scalatest.Matchers - -import org.apache.spark._ -import org.apache.spark.{LocalSparkContext, SparkConf, Success} -import org.apache.spark.executor._ -import org.apache.spark.scheduler._ -import org.apache.spark.ui.jobs.UIData.TaskUIData -import org.apache.spark.util.{AccumulatorContext, Utils} - -class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers { - - val jobSubmissionTime = 1421191042750L - val jobCompletionTime = 1421191296660L - - private def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") - SparkListenerStageSubmitted(stageInfo) - } - - private def createStageEndEvent(stageId: Int, failed: Boolean = false) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") - if (failed) { - stageInfo.failureReason = Some("Failed!") - } - SparkListenerStageCompleted(stageInfo) - } - - private def createJobStartEvent( - jobId: Int, - stageIds: Seq[Int], - jobGroup: Option[String] = None): SparkListenerJobStart = { - val stageInfos = stageIds.map { stageId => - new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") - } - val properties: Option[Properties] = jobGroup.map { groupId => - val props = new Properties() - props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) - props - } - SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull) - } - - private def createJobEndEvent(jobId: Int, failed: Boolean = false) = { - val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded - SparkListenerJobEnd(jobId, jobCompletionTime, result) - } - - private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) { - val stagesThatWontBeRun = jobId * 200 to jobId * 200 + 10 - val stageIds = jobId * 100 to jobId * 100 + 50 - listener.onJobStart(createJobStartEvent(jobId, stageIds ++ stagesThatWontBeRun)) - for (stageId <- stageIds) { - listener.onStageSubmitted(createStageStartEvent(stageId)) - listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0)) - } - listener.onJobEnd(createJobEndEvent(jobId, shouldFail)) - } - - private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) { - listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) => - assert(size === 0, s"$fieldName was not empty") - } - } - - test("test LRU eviction of stages") { - def runWithListener(listener: JobProgressListener) : Unit = { - for (i <- 1 to 50) { - listener.onStageSubmitted(createStageStartEvent(i)) - listener.onStageCompleted(createStageEndEvent(i)) - } - assertActiveJobsStateIsEmpty(listener) - } - val conf = new SparkConf() - conf.set("spark.ui.retainedStages", 5.toString) - var listener = new JobProgressListener(conf) - - // Test with 5 retainedStages - runWithListener(listener) - listener.completedStages.size should be (5) - listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) - - // Test with 0 retainedStages - conf.set("spark.ui.retainedStages", 0.toString) - listener = new JobProgressListener(conf) - runWithListener(listener) - listener.completedStages.size should be (0) - } - - test("test clearing of stageIdToActiveJobs") { - val conf = new SparkConf() - conf.set("spark.ui.retainedStages", 5.toString) - val listener = new JobProgressListener(conf) - val jobId = 0 - val stageIds = 1 to 50 - // Start a job with 50 stages - listener.onJobStart(createJobStartEvent(jobId, stageIds)) - for (stageId <- stageIds) { - listener.onStageSubmitted(createStageStartEvent(stageId)) - } - listener.stageIdToActiveJobIds.size should be > 0 - - // Complete the stages and job - for (stageId <- stageIds) { - listener.onStageCompleted(createStageEndEvent(stageId, failed = false)) - } - listener.onJobEnd(createJobEndEvent(jobId, false)) - assertActiveJobsStateIsEmpty(listener) - listener.stageIdToActiveJobIds.size should be (0) - } - - test("test clearing of jobGroupToJobIds") { - def runWithListener(listener: JobProgressListener): Unit = { - // Run 50 jobs, each with one stage - for (jobId <- 0 to 50) { - listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString))) - listener.onStageSubmitted(createStageStartEvent(0)) - listener.onStageCompleted(createStageEndEvent(0, failed = false)) - listener.onJobEnd(createJobEndEvent(jobId, false)) - } - assertActiveJobsStateIsEmpty(listener) - } - val conf = new SparkConf() - conf.set("spark.ui.retainedJobs", 5.toString) - - var listener = new JobProgressListener(conf) - runWithListener(listener) - // This collection won't become empty, but it should be bounded by spark.ui.retainedJobs - listener.jobGroupToJobIds.size should be (5) - - // Test with 0 jobs - conf.set("spark.ui.retainedJobs", 0.toString) - listener = new JobProgressListener(conf) - runWithListener(listener) - listener.jobGroupToJobIds.size should be (0) - } - - test("test LRU eviction of jobs") { - val conf = new SparkConf() - conf.set("spark.ui.retainedStages", 5.toString) - conf.set("spark.ui.retainedJobs", 5.toString) - val listener = new JobProgressListener(conf) - - // Run a bunch of jobs to get the listener into a state where we've exceeded both the - // job and stage retention limits: - for (jobId <- 1 to 10) { - runJob(listener, jobId, shouldFail = false) - } - for (jobId <- 200 to 210) { - runJob(listener, jobId, shouldFail = true) - } - assertActiveJobsStateIsEmpty(listener) - // Snapshot the sizes of various soft- and hard-size-limited collections: - val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections - val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections - // Run some more jobs: - for (jobId <- 11 to 50) { - runJob(listener, jobId, shouldFail = false) - // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: - listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) - listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) - } - - listener.completedJobs.size should be (5) - listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) - - for (jobId <- 51 to 100) { - runJob(listener, jobId, shouldFail = true) - // We shouldn't exceed the hard / soft limit sizes after the jobs have finished: - listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes) - listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes) - } - assertActiveJobsStateIsEmpty(listener) - - // Completed and failed jobs each their own size limits, so this should still be the same: - listener.completedJobs.size should be (5) - listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46)) - listener.failedJobs.size should be (5) - listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96)) - } - - test("test executor id to summary") { - val conf = new SparkConf() - val listener = new JobProgressListener(conf) - val taskMetrics = TaskMetrics.empty - val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() - assert(listener.stageIdToData.size === 0) - - // finish this task, should get updated shuffleRead - shuffleReadMetrics.incRemoteBytesRead(1000) - taskMetrics.mergeShuffleReadMetrics() - var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - var task = new ShuffleMapTask(0) - val taskType = Utils.getFormattedClassName(task) - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse((0, 0), fail()) - .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000) - - // finish a task with unknown executor-id, nothing should happen - taskInfo = - new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true) - taskInfo.finishTime = 1 - task = new ShuffleMapTask(0) - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.size === 1) - - // finish this task, should get updated duration - taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - task = new ShuffleMapTask(0) - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse((0, 0), fail()) - .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000) - - // finish this task, should get updated duration - taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - task = new ShuffleMapTask(0) - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse((0, 0), fail()) - .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000) - } - - test("test task success vs failure counting for different task end reasons") { - val conf = new SparkConf() - val listener = new JobProgressListener(conf) - val metrics = TaskMetrics.empty - val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - val task = new ShuffleMapTask(0) - val taskType = Utils.getFormattedClassName(task) - - // Go through all the failure cases to make sure we are counting them as failures. - val taskFailedReasons = Seq( - Resubmitted, - new FetchFailed(null, 0, 0, 0, "ignored"), - ExceptionFailure("Exception", "description", null, null, None), - TaskResultLost, - ExecutorLostFailure("0", true, Some("Induced failure")), - UnknownReason) - var failCount = 0 - for (reason <- taskFailedReasons) { - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics)) - failCount += 1 - assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0) - assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) - } - - // Make sure killed tasks are accounted for correctly. - listener.onTaskEnd( - SparkListenerTaskEnd( - task.stageId, 0, taskType, TaskKilled("test"), taskInfo, metrics)) - assert(listener.stageIdToData((task.stageId, 0)).reasonToNumKilled === Map("test" -> 1)) - - // Make sure we count success as success. - listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1) - assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) - } - - test("test update metrics") { - val conf = new SparkConf() - val listener = new JobProgressListener(conf) - - val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0)) - val execId = "exe-1" - - def makeTaskMetrics(base: Int): TaskMetrics = { - val taskMetrics = TaskMetrics.empty - val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() - val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics - val inputMetrics = taskMetrics.inputMetrics - val outputMetrics = taskMetrics.outputMetrics - shuffleReadMetrics.incRemoteBytesRead(base + 1) - shuffleReadMetrics.incLocalBytesRead(base + 9) - shuffleReadMetrics.incRemoteBlocksFetched(base + 2) - taskMetrics.mergeShuffleReadMetrics() - shuffleWriteMetrics.incBytesWritten(base + 3) - taskMetrics.setExecutorRunTime(base + 4) - taskMetrics.incDiskBytesSpilled(base + 5) - taskMetrics.incMemoryBytesSpilled(base + 6) - inputMetrics.setBytesRead(base + 7) - outputMetrics.setBytesWritten(base + 8) - taskMetrics - } - - def makeTaskInfo(taskId: Long, finishTime: Int = 0): TaskInfo = { - val taskInfo = new TaskInfo(taskId, 0, 1, 0L, execId, "host1", TaskLocality.NODE_LOCAL, - false) - taskInfo.finishTime = finishTime - taskInfo - } - - listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L))) - listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L))) - listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) - listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) - - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)), - (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)), - (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo))))) - - var stage0Data = listener.stageIdToData.get((0, 0)).get - var stage1Data = listener.stageIdToData.get((1, 0)).get - assert(stage0Data.shuffleReadTotalBytes == 220) - assert(stage1Data.shuffleReadTotalBytes == 410) - assert(stage0Data.shuffleWriteBytes == 106) - assert(stage1Data.shuffleWriteBytes == 203) - assert(stage0Data.executorRunTime == 108) - assert(stage1Data.executorRunTime == 204) - assert(stage0Data.diskBytesSpilled == 110) - assert(stage1Data.diskBytesSpilled == 205) - assert(stage0Data.memoryBytesSpilled == 112) - assert(stage1Data.memoryBytesSpilled == 206) - assert(stage0Data.inputBytes == 114) - assert(stage1Data.inputBytes == 207) - assert(stage0Data.outputBytes == 116) - assert(stage1Data.outputBytes == 208) - - assert( - stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 2) - assert( - stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 102) - assert( - stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 202) - - // task that was included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), - makeTaskMetrics(300))) - // task that wasn't included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1), - makeTaskMetrics(400))) - - stage0Data = listener.stageIdToData.get((0, 0)).get - stage1Data = listener.stageIdToData.get((1, 0)).get - // Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed - // (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820. - assert(stage0Data.shuffleReadTotalBytes == 820) - // Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes. - assert(stage1Data.shuffleReadTotalBytes == 1220) - assert(stage0Data.shuffleWriteBytes == 406) - assert(stage1Data.shuffleWriteBytes == 606) - assert(stage0Data.executorRunTime == 408) - assert(stage1Data.executorRunTime == 608) - assert(stage0Data.diskBytesSpilled == 410) - assert(stage1Data.diskBytesSpilled == 610) - assert(stage0Data.memoryBytesSpilled == 412) - assert(stage1Data.memoryBytesSpilled == 612) - assert(stage0Data.inputBytes == 414) - assert(stage1Data.inputBytes == 614) - assert(stage0Data.outputBytes == 416) - assert(stage1Data.outputBytes == 616) - assert( - stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 302) - assert( - stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402) - } - - test("drop internal and sql accumulators") { - val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false) - val internalAccum = - AccumulableInfo(id = 1, name = Some("internal"), None, None, true, false, None) - val sqlAccum = AccumulableInfo( - id = 2, - name = Some("sql"), - update = None, - value = None, - internal = false, - countFailedValues = false, - metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) - val userAccum = AccumulableInfo( - id = 3, - name = Some("user"), - update = None, - value = None, - internal = false, - countFailedValues = false, - metadata = None) - taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum)) - - val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) - assert(newTaskInfo.accumulables === Seq(userAccum)) - } - - test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") { - val conf = new SparkConf() - conf.set("spark.ui.retainedTasks", "100") - val taskMetrics = TaskMetrics.empty - taskMetrics.mergeShuffleReadMetrics() - val task = new ShuffleMapTask(0) - val taskType = Utils.getFormattedClassName(task) - - val listener1 = new JobProgressListener(conf) - for (t <- 1 to 101) { - val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - listener1.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - } - // 101 - math.max(100 / 10, 101 - 100) = 91 - assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91) - - val listener2 = new JobProgressListener(conf) - for (t <- 1 to 150) { - val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) - taskInfo.finishTime = 1 - listener2.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) - } - // 150 - math.max(100 / 10, 150 - 100) = 100 - assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100) - } - -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index f55af6a5cc358..a25878c4028dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -23,16 +23,16 @@ import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.status.api.v1.{JobData, StageData} import org.apache.spark.streaming.Time import org.apache.spark.streaming.ui.StreamingJobProgressListener.SparkJobId import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} -import org.apache.spark.ui.jobs.UIData.JobUIData -private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) +private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobData: Option[JobData]) private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { private val streamingListener = parent.listener - private val sparkListener = parent.ssc.sc.jobProgressListener + private val store = parent.parent.store private def columns: Seq[Node] = { Output Op Id @@ -52,13 +52,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { formattedOutputOpDuration: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, - sparkJob: SparkJobIdWithUIData): Seq[Node] = { - if (sparkJob.jobUIData.isDefined) { + jobIdWithData: SparkJobIdWithUIData): Seq[Node] = { + if (jobIdWithData.jobData.isDefined) { generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) + numSparkJobRowsInOutputOp, isFirstRow, jobIdWithData.jobData.get) } else { generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) + numSparkJobRowsInOutputOp, isFirstRow, jobIdWithData.sparkJobId) } } @@ -94,15 +94,15 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { formattedOutputOpDuration: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, - sparkJob: JobUIData): Seq[Node] = { + sparkJob: JobData): Seq[Node] = { val duration: Option[Long] = { sparkJob.submissionTime.map { start => - val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) - end - start + val end = sparkJob.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis()) + end - start.getTime() } } val lastFailureReason = - sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get). + sparkJob.stageIds.sorted.reverse.flatMap(getStageData). dropWhile(_.failureReason == None).take(1). // get the first info that contains failure flatMap(info => info.failureReason).headOption.getOrElse("") val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-") @@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { {formattedDuration} - {sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages} + {sparkJob.numCompletedStages}/{sparkJob.stageIds.size - sparkJob.numSkippedStages} {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"} {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"} @@ -146,7 +146,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { completed = sparkJob.numCompletedTasks, failed = sparkJob.numFailedTasks, skipped = sparkJob.numSkippedTasks, - reasonToNumKilled = sparkJob.reasonToNumKilled, + reasonToNumKilled = sparkJob.killedTasksSummary, total = sparkJob.numTasks - sparkJob.numSkippedTasks) } @@ -246,11 +246,19 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } - private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { - sparkListener.activeJobs.get(sparkJobId).orElse { - sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse { - sparkListener.failedJobs.find(_.jobId == sparkJobId) - } + private def getJobData(sparkJobId: SparkJobId): Option[JobData] = { + try { + Some(store.job(sparkJobId)) + } catch { + case _: NoSuchElementException => None + } + } + + private def getStageData(stageId: Int): Option[StageData] = { + try { + Some(store.lastStageAttempt(stageId)) + } catch { + case _: NoSuchElementException => None } } @@ -282,25 +290,22 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) (outputOperation, sparkJobIds) }.toSeq.sortBy(_._1.id) - sparkListener.synchronized { - val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) => - (outputOpData, - sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId)))) - } + val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) => + (outputOpData, sparkJobIds.map { jobId => SparkJobIdWithUIData(jobId, getJobData(jobId)) }) + } - - - {columns} - - - { - outputOpWithJobs.map { case (outputOpData, sparkJobIds) => - generateOutputOpIdRow(outputOpData, sparkJobIds) - } +
+ + {columns} + + + { + outputOpWithJobs.map { case (outputOpData, sparkJobs) => + generateOutputOpIdRow(outputOpData, sparkJobs) } - -
- } + } + + } def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized {