From cf10c0e00c9eae6fe25f6e153d860099d6959e78 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Oct 2016 14:29:41 -0700 Subject: [PATCH] SHS-NG M3: Add initial listener implementation, handle scheduler events. The initial listener is based on the existing JobProgressListener, and tries to mimic its behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history provider code can be reused. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestone. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types. --- .../org/apache/spark/kvstore/LevelDB.java | 2 +- .../deploy/history/FsHistoryProvider.scala | 13 +- .../apache/spark/deploy/history/config.scala | 6 - .../spark/status/AppStateListener.scala | 1090 +++++++++++++++++ .../org/apache/spark/status/KVUtils.scala | 111 ++ .../status/api/v1/AllStagesResource.scala | 4 +- .../org/apache/spark/status/api/v1/api.scala | 10 +- .../org/apache/spark/status/storeTypes.scala | 99 ++ .../spark/status/AppStateListenerSuite.scala | 694 +++++++++++ scalastyle-config.xml | 2 +- 10 files changed, 2007 insertions(+), 24 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/AppStateListener.scala create mode 100644 core/src/main/scala/org/apache/spark/status/KVUtils.scala create mode 100644 core/src/main/scala/org/apache/spark/status/storeTypes.scala create mode 100644 core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java index b40c7950d1d11..304c48e6cbb96 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java @@ -69,7 +69,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception { this.types = new ConcurrentHashMap<>(); Options options = new Options(); - options.createIfMissing(!path.exists()); + options.createIfMissing(true); this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); byte[] versionData = db().get(STORE_VERSION_KEY); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3eee5d809c470..e2273e530952c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -43,6 +43,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -699,18 +700,6 @@ private[history] object FsHistoryProvider { private val CURRENT_VERSION = 1L } -/** - * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as - * the API serializer. - */ -private class KVStoreScalaSerializer extends KVStoreSerializer { - - mapper.registerModule(DefaultScalaModule) - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) - mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) - -} - case class KVStoreMetadata( val version: Long, val logDir: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index affaff86e3139..9ca07e3d63271 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -19,16 +19,10 @@ package org.apache.spark.deploy.history import java.util.concurrent.TimeUnit -import scala.annotation.meta.getter - import org.apache.spark.internal.config.ConfigBuilder -import org.apache.spark.kvstore.KVIndex private[spark] object config { - /** Use this to annotate constructor params to be used as KVStore indices. */ - type KVIndexParam = KVIndex @getter - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala new file mode 100644 index 0000000000000..61af0ff1aba2b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -0,0 +1,1090 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.kvstore.KVStore +import org.apache.spark.scheduler._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.ui.SparkUI + +/** + * A Spark listener that writes application information to a data store. The types written to the + * store are defined in the `storeTypes.scala` file and are based on the public REST API. + * + * TODO (M4): + * - Need to add state / information needed by the UI that is currently not in the API. + * - Need mechanism to insert this into SparkContext and be called when SparkContext is + * closed (new plugin interface?) + * - Need to close store in live UI after SparkContext closes. + * - Cache active jobs / stages / other interesting things in memory to make it faster + * to update them + * - Flush data to the store in a separate thread (to avoid stalling the listener bus). + * + * TODO (future): + * - to enable incremental parsing of event logs, all state in this class needs to be serialized + * to the underlying store and loaded when the class is instantiated. That could potentially + * be written to the AppStatusStoreMetadata object. + */ +private class AppStateListener(override protected val kvstore: KVStore) extends SparkListener + with KVUtils { + + import AppStateListener._ + + private var appId: String = null + private var activeJobs: Set[Int] = Set() + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + assert(event.appId.isDefined, "Application without IDs are not supported.") + this.appId = event.appId.get + + val attempt = new v1.ApplicationAttemptInfo( + event.appAttemptId, + new Date(event.time), + DEFAULT_DATE, + new Date(event.time), + -1L, + event.sparkUser, + false) + + val app = new v1.ApplicationInfo( + appId, + event.appName, + None, + None, + None, + None, + Seq(attempt)) + + val stored = new ApplicationInfoWrapper(app) + kvstore.write(stored) + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + // TODO + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + update[ApplicationInfoWrapper](appId) { wrapper => + val app = wrapper.get.info + val attempt = app.attempts.head + + val newAttempt = new v1.ApplicationAttemptInfo( + attempt.attemptId, + attempt.startTime, + new Date(event.time), + new Date(event.time), + event.time - attempt.startTime.getTime(), + attempt.sparkUser, + true) + + val newApp = new v1.ApplicationInfo( + app.id, app.name, app.coresGranted, app.maxCores, app.coresPerExecutor, + app.memoryPerExecutorMB, Seq(newAttempt)) + + new ApplicationInfoWrapper(newApp) + } + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + // This needs to be an update in case an executor re-registers after the driver has + // marked it as "dead". + updateExecutorSummary(event.executorId) { exec => + val newInfo = newExecutorSummary(exec.info, + hostPort = event.executorInfo.executorHost, + isActive = true, + totalCores = event.executorInfo.totalCores, + executorLogs = event.executorInfo.logUrlMap) + new ExecutorSummaryWrapper(newInfo) + } + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + updateExecutorSummary(event.executorId) { exec => + val newInfo = newExecutorSummary(exec.info, + isActive = false) + new ExecutorSummaryWrapper(newInfo) + } + } + + override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { + updateBlackListStatus(event.executorId, true) + } + + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { + updateBlackListStatus(event.executorId, false) + } + + override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { + updateNodeBlackList(event.hostId, true) + } + + override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { + updateNodeBlackList(event.hostId, false) + } + + private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { + updateExecutorSummary(execId) { exec => + val newInfo = newExecutorSummary(exec.info, + isBlacklisted = blacklisted) + new ExecutorSummaryWrapper(newInfo) + } + } + + private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { + // Implicitly (un)blacklist every executor associated with the node. + val it = kvstore.view(classOf[ExecutorSummaryWrapper]).index("host").closeableIterator() + try { + it.asScala + .takeWhile(_.host == host) + .foreach { exec => updateBlackListStatus(exec.id, blacklisted) } + } finally { + it.close() + } + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { + // Compute (a potential underestimate of) the number of tasks that will be run by this job. + // This may be an underestimate because the job start event references all of the result + // stages' transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. + val numTasks = { + val missingStages = event.stageInfos.filter(_.completionTime.isEmpty) + missingStages.map(_.numTasks).sum + } + + val lastStageInfo = event.stageInfos.lastOption + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + + val jobGroup = Option(event.properties) + .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } + + val info = newJobData(None, + jobId = event.jobId, + name = lastStageName, + description = lastStageInfo.map(_.details), + submissionTime = Option(event.time).filter(_ >= 0).map(new Date(_)), + stageIds = event.stageIds, + jobGroup = jobGroup, + status = JobExecutionStatus.RUNNING, + numTasks = numTasks) + + val initialStages = event.stageInfos.map { stage => + // A new job submission may re-use an existing stage, so this code needs to do an update + // instead of just a write. + updateStageData(stage.stageId, stage.attemptId) { wrapper => + val jobIds = wrapper.jobIds ++ Set(event.jobId) + newStageDataWrapper(wrapper, wrapper.info, jobIds = jobIds) + } + (stage.stageId, stage.attemptId) + }.toSet + + val stored = newJobDataWrapper(None, info, initialStages = initialStages) + kvstore.write(stored) + + activeJobs += event.jobId + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + updateJobData(event.jobId) { job => + val status = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case JobFailed(_) => JobExecutionStatus.FAILED + } + + val skippedStages = job.info.stageIds.toSet &~ job.completedStages + + // Calculate the number of skipped tasks based on the number of completed tasks of + // skipped stage attempts. + val skippedAttempts = job.initialStages.filter { case (stageId, _) => + skippedStages.contains(stageId) + } + val skippedTasks = skippedAttempts.flatMap { case (id, attempt) => + read[StageDataWrapper](Array(id, attempt)).map(_.info.numCompleteTasks) + }.sum + + val newInfo = newJobData(job.info, + jobId = event.jobId, + completionTime = Some(new Date(event.time)), + status = status, + numSkippedStages = skippedStages.size, + numSkippedTasks = skippedTasks) + newJobDataWrapper(job, newInfo) + } + + activeJobs -= event.jobId + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + // New attempts are created based on the previous attempt's data - mainly to track which jobs + // depend on the stage. Attempt IDs start at 0, so this check takes care of retrieving the + // correct previous attempt from the store. + val previousAttempt = if (event.stageInfo.attemptId > 0) { + event.stageInfo.attemptId - 1 + } else { + event.stageInfo.attemptId + } + + updateStageData(event.stageInfo.stageId, previousAttempt) { stage => + val poolName = Option(event.properties).flatMap { p => + Option(p.getProperty("spark.scheduler.pool")) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) + + val description = Option(event.properties).flatMap { p => + Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + + // Update jobs to add active stage and remove the stage from the completed list. + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveStages = wrapper.info.numActiveStages + 1) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + val newInfo = newStageData( + stage.info, + status = v1.StageStatus.ACTIVE, + stageId = event.stageInfo.stageId, + attemptId = event.stageInfo.attemptId, + submissionTime = event.stageInfo.submissionTime.map(new Date(_)), + details = description, + schedulingPool = poolName) + newStageDataWrapper(stage, newInfo) + } + + // Update RDDs with the stage name. + event.stageInfo.rddInfos.foreach { info => + updateRDDStorageInfo(info.id) { rdd => + val updated = newRDDStorageInfo(rdd.info, + name = info.name, + numPartitions = info.numPartitions, + storageLevel = info.storageLevel.description) + new RDDStorageInfoWrapper(updated) + } + } + } + + override def onTaskStart(event: SparkListenerTaskStart): Unit = { + if (event.taskInfo != null) { + val task = newTaskData(None, + taskId = event.taskInfo.taskId, + index = event.taskInfo.index, + attempt = event.taskInfo.attemptNumber, + launchTime = new Date(event.taskInfo.launchTime), + executorId = event.taskInfo.executorId, + host = event.taskInfo.host, + status = event.taskInfo.status, + taskLocality = event.taskInfo.taskLocality.toString(), + speculative = event.taskInfo.speculative) + kvstore.write(new TaskDataWrapper(task)) + } + + updateStageData(event.stageId, event.stageAttemptId) { stage => + // Update jobs to add active stage and remove the stage from the completed list. + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveTasks = wrapper.info.numActiveTasks + 1) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + val firstTask = stage.info.firstTaskLaunchedTime.getOrElse( + new Date(event.taskInfo.launchTime)) + + val newInfo = newStageData( + stage.info, + stageId = event.stageId, + attemptId = event.stageAttemptId, + firstTaskLaunchedTime = Some(firstTask), + numActiveTasks = stage.info.numActiveTasks + 1) + newStageDataWrapper(stage, newInfo) + } + } + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + // Nothing to do here. + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + val info = event.taskInfo + // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task + // completion event is for. Let's just drop it here. This means we might have some speculation + // tasks on the web ui that are never marked as complete. + if (info == null || event.stageAttemptId == -1) { + return + } + + var metricsDelta: v1.TaskMetrics = null + + updateTaskData(event.taskInfo.taskId) { task => + val errorMessage = event.reason match { + case Success => + None + case k: TaskKilled => + Some(k.reason) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates + Some(e.toErrorString) + case e: TaskFailedReason => // All other failure cases + Some(e.toErrorString) + } + + metricsDelta = calculateMetricsDelta(Option(event.taskMetrics), task.info.taskMetrics) + + val newInfo = newTaskData( + task.info, + taskId = event.taskInfo.taskId, + duration = Some(event.taskInfo.duration), + status = event.taskInfo.status, + errorMessage = errorMessage, + taskMetrics = Option(event.taskMetrics).map(newTaskMetrics)) + new TaskDataWrapper(newInfo) + } + + val (completedDelta, failedDelta) = event.reason match { + case Success => + (1, 0) + case _: TaskKilled => + (0, 0) + case _ => + (0, 1) + } + + updateStageData(event.stageId, event.stageAttemptId) { stage => + val accumulables = info.accumulables.map(newAccumulableInfo).toSeq + + val oldStage = stage.info + val newInfo = newStageData( + oldStage, + numActiveTasks = oldStage.numActiveTasks - 1, + numCompleteTasks = oldStage.numCompleteTasks + completedDelta, + numFailedTasks = oldStage.numFailedTasks + failedDelta, + accumulatorUpdates = accumulables, + inputBytes = oldStage.inputBytes + metricsDelta.inputMetrics.bytesRead, + inputRecords = oldStage.inputRecords + metricsDelta.inputMetrics.recordsRead, + outputBytes = oldStage.outputBytes + metricsDelta.outputMetrics.bytesWritten, + outputRecords = oldStage.outputRecords + metricsDelta.outputMetrics.recordsWritten, + shuffleReadBytes = oldStage.shuffleReadBytes + + metricsDelta.shuffleReadMetrics.localBytesRead + + metricsDelta.shuffleReadMetrics.remoteBytesRead, + shuffleReadRecords = oldStage.shuffleReadRecords + + metricsDelta.shuffleReadMetrics.recordsRead, + shuffleWriteBytes = oldStage.shuffleWriteBytes + + metricsDelta.shuffleWriteMetrics.bytesWritten, + shuffleWriteRecords = oldStage.shuffleWriteRecords + + metricsDelta.shuffleWriteMetrics.recordsWritten, + memoryBytesSpilled = oldStage.memoryBytesSpilled + metricsDelta.memoryBytesSpilled, + diskBytesSpilled = oldStage.diskBytesSpilled + metricsDelta.diskBytesSpilled) + + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveTasks = wrapper.info.numActiveTasks - 1, + numCompletedTasks = wrapper.info.numCompletedTasks + completedDelta, + numFailedTasks = wrapper.info.numFailedTasks + failedDelta) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + newStageDataWrapper(stage, newInfo) + } + + updateExecutorStageSummary( + event.stageId, + event.stageAttemptId, + event.taskInfo.executorId) { exec => + newExecutorStageSummary( + exec, + failedTasks = exec.failedTasks + failedDelta, + inputBytes = exec.inputBytes + metricsDelta.inputMetrics.bytesRead, + outputBytes = exec.outputBytes + metricsDelta.outputMetrics.bytesWritten, + shuffleRead = exec.shuffleRead + metricsDelta.shuffleReadMetrics.localBytesRead + + metricsDelta.shuffleReadMetrics.remoteBytesRead, + shuffleWrite = exec.shuffleWrite + metricsDelta.shuffleWriteMetrics.bytesWritten, + memoryBytesSpilled = exec.memoryBytesSpilled + metricsDelta.memoryBytesSpilled, + diskBytesSpilled = exec.diskBytesSpilled + metricsDelta.diskBytesSpilled) + } + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val info = event.stageInfo + updateStageData(info.stageId, info.attemptId) { stage => + val status = event.stageInfo.failureReason match { + case Some(_) => v1.StageStatus.FAILED + case None => v1.StageStatus.COMPLETE + } + val newStage = newStageData( + stage.info, + status = status, + completionTime = info.completionTime.map(new Date(_))) + + val (completedDelta, failedDelta) = if (status == v1.StageStatus.COMPLETE) { + (1, 0) + } else { + (0, 1) + } + + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveStages = wrapper.info.numActiveStages - 1, + numCompletedStages = wrapper.info.numCompletedStages + completedDelta, + numFailedStages = wrapper.info.numFailedStages + failedDelta) + val stages = wrapper.completedStages + info.stageId + newJobDataWrapper(wrapper, newJobInfo, completedStages = stages) + } + } + + newStageDataWrapper(stage, newStage) + } + } + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { + updateExecutorSummary(event.blockManagerId.executorId) { exec => + val updated = newExecutorSummary( + exec.info, + maxMemory = event.maxMem) + new ExecutorSummaryWrapper(updated) + } + } + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + // Nothing to do here. Covered by onExecutorRemoved. + } + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { + kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => + val task = read[TaskDataWrapper](taskId) + val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) + val delta = calculateMetricsDelta(Some(metrics), task.flatMap(_.info.taskMetrics)) + + updateStageData(sid, sAttempt) { stage => + val oldStage = stage.info + val updatedStage = newStageData( + oldStage, + inputBytes = oldStage.inputBytes + delta.inputMetrics.bytesRead, + inputRecords = oldStage.inputRecords + delta.inputMetrics.recordsRead, + outputBytes = oldStage.outputBytes + delta.outputMetrics.bytesWritten, + outputRecords = oldStage.outputRecords + delta.outputMetrics.recordsWritten, + shuffleReadBytes = oldStage.shuffleReadBytes + + delta.shuffleReadMetrics.localBytesRead + + delta.shuffleReadMetrics.remoteBytesRead, + shuffleReadRecords = oldStage.shuffleReadRecords + + delta.shuffleReadMetrics.recordsRead, + shuffleWriteBytes = oldStage.shuffleWriteBytes + + delta.shuffleWriteMetrics.bytesWritten, + shuffleWriteRecords = oldStage.shuffleWriteRecords + + delta.shuffleWriteMetrics.recordsWritten, + memoryBytesSpilled = oldStage.memoryBytesSpilled + delta.memoryBytesSpilled, + diskBytesSpilled = oldStage.diskBytesSpilled + delta.diskBytesSpilled) + newStageDataWrapper(stage, updatedStage) + } + + updateExecutorStageSummary(sid, sAttempt, event.execId) { exec => + newExecutorStageSummary( + exec, + inputBytes = exec.inputBytes + delta.inputMetrics.bytesRead, + outputBytes = exec.outputBytes + delta.outputMetrics.bytesWritten, + shuffleRead = exec.shuffleRead + delta.shuffleReadMetrics.localBytesRead + + delta.shuffleReadMetrics.remoteBytesRead, + shuffleWrite = exec.shuffleWrite + delta.shuffleWriteMetrics.bytesWritten, + memoryBytesSpilled = exec.memoryBytesSpilled + delta.memoryBytesSpilled, + diskBytesSpilled = exec.diskBytesSpilled + delta.diskBytesSpilled) + } + } + } + + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + event.blockUpdatedInfo.blockId match { + case block: RDDBlockId => updateRDDBlock(event, block) + case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + // TODO: SQL, Streaming. + } + + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { + val executorId = event.blockUpdatedInfo.blockManagerId.executorId + + // Whether values are being added to or removed from the existing accounting. + val storageLevel = event.blockUpdatedInfo.storageLevel + val diskMult = if (storageLevel.useDisk) 1 else -1 + val memoryMult = if (storageLevel.useMemory) 1 else -1 + + // Function to apply a delta to a value, but ensure that it doesn't go negative. + def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) + + // If the storage level is NONE, then don't update the storage level of existing information. + val updatedStorageLevel = if (storageLevel.useMemory || storageLevel.useDisk) { + Some(storageLevel.description) + } else { + None + } + + // We need information about the executor to update some memory accounting values in the + // RDD info, so read that beforehand. + val executorInfo = read[ExecutorSummaryWrapper](executorId).getOrElse( + new ExecutorSummaryWrapper(newExecutorSummary(None, id = executorId))) + + var rddBlocksDelta = 0 + + // Update the block entry in the RDD info, keeping track of the deltas above so that we + // can update the executor information too. + updateRDDStorageInfo(block.rddId) { rdd => + val (maybeOldBlock, others) = rdd.info.partitions.getOrElse(Nil) + .partition(_.blockName == block.name) match { + case (old, others) => + (old.headOption, others) + } + + val oldBlock = maybeOldBlock.getOrElse { + newRDDPartitionInfo(None, blockName = block.name) + } + + val newExecutors = if (storageLevel.useDisk || storageLevel.useMemory) { + if (!oldBlock.executors.contains(executorId)) { + rddBlocksDelta = 1 + } + oldBlock.executors.toSet + executorId + } else { + rddBlocksDelta = -1 + oldBlock.executors.toSet - executorId + } + + // Only update the block if it's still stored in some executor, otherwise get rid of it. + val newBlocks = if (newExecutors.nonEmpty) { + val newBlock = newRDDPartitionInfo( + oldBlock, + storageLevel = updatedStorageLevel, + memoryUsed = newValue(oldBlock.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(oldBlock.diskUsed, event.blockUpdatedInfo.diskSize * diskMult), + executors = newExecutors.toSeq) + + Seq(newBlock) + } else { + Nil + } + + val address = event.blockUpdatedInfo.blockManagerId.hostPort + val (oldDist, otherDists) = rdd.info.dataDistribution.getOrElse(Nil) + .partition(_.address == address) match { + case (old, others) => + val _old = old.headOption.getOrElse { + newRDDDataDistribution( + None, + address = address, + memoryRemaining = executorInfo.info.maxMemory) + } + (_old, others) + } + + // If the new distribution is empty, just do not add it to the new RDD info. + val newDistMem = newValue(oldDist.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult) + val newDistDisk = newValue(oldDist.diskUsed, event.blockUpdatedInfo.diskSize * diskMult) + val newDists = if (newDistMem > 0 || newDistDisk > 0) { + val newDist = newRDDDataDistribution( + oldDist, + memoryUsed = newDistMem, + memoryRemaining = newValue(oldDist.memoryRemaining, + event.blockUpdatedInfo.memSize * memoryMult * -1), + diskUsed = newDistDisk) + Seq(newDist) + } else { + Nil + } + + val newRDD = newRDDStorageInfo( + rdd.info, + storageLevel = updatedStorageLevel, + memoryUsed = newValue(rdd.info.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(rdd.info.diskUsed, event.blockUpdatedInfo.diskSize * diskMult), + dataDistribution = Some(otherDists ++ newDists), + partitions = Some(others ++ newBlocks)) + new RDDStorageInfoWrapper(newRDD) + } + + // Update the ExecutorSummary for the block's manager. + val newExecSummary = newExecutorSummary( + executorInfo.info, + rddBlocks = newValue(executorInfo.info.rddBlocks, rddBlocksDelta).toInt, + memoryUsed = newValue(executorInfo.info.memoryUsed, + event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(executorInfo.info.diskUsed, + event.blockUpdatedInfo.diskSize * diskMult)) + kvstore.write(new ExecutorSummaryWrapper(newExecSummary)) + } + + private def updateJobData(id: Int)(fn: JobDataWrapper => JobDataWrapper): Unit = { + update[JobDataWrapper](id) { old => + val job = old.getOrElse((newJobDataWrapper(None, newJobData(None)))) + fn(job) + } + } + + private def updateStageData( + stageId: Int, + attemptId: Int) + (fn: StageDataWrapper => StageDataWrapper): Unit = { + update[StageDataWrapper](Array(stageId, attemptId)) { old => + val stage = old.getOrElse { + newStageDataWrapper(None, newStageData(None, stageId = stageId, attemptId = attemptId)) + } + fn(stage) + } + } + + private def updateTaskData(id: Long)(fn: TaskDataWrapper => TaskDataWrapper): Unit = { + update[TaskDataWrapper](id) { old => + val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id))) + fn(task) + } + } + + private def updateExecutorSummary( + id: String) + (fn: ExecutorSummaryWrapper => ExecutorSummaryWrapper): Unit = { + update[ExecutorSummaryWrapper](id) { old => + val exec = old.getOrElse( + new ExecutorSummaryWrapper(newExecutorSummary(None, id = id))) + fn(exec) + } + } + + private def updateRDDStorageInfo( + id: Int) + (fn: RDDStorageInfoWrapper => RDDStorageInfoWrapper): Unit = { + update[RDDStorageInfoWrapper](id) { old => + val rdd = old.getOrElse(new RDDStorageInfoWrapper(newRDDStorageInfo(None, id = id))) + fn(rdd) + } + } + + private def updateExecutorStageSummary( + stageId: Int, + stageAttemptId: Int, + executorId: String) + (fn: v1.ExecutorStageSummary => v1.ExecutorStageSummary): Unit = { + update[ExecutorStageSummaryWrapper](Array(stageId, stageAttemptId, executorId)) { old => + val oldInfo = old.map(_.info).getOrElse(newExecutorStageSummary(None)) + new ExecutorStageSummaryWrapper(stageId, stageAttemptId, executorId, fn(oldInfo)) + } + } + + /** + * Return a new TaskMetrics object containing the delta of the various fields of the given + * metrics objects. This is currently targeted at updating stage data, so it does not + * necessarily calculate deltas for all the fields. + */ + private def calculateMetricsDelta( + taskMetrics: Option[TaskMetrics], + oldMetrics: Option[v1.TaskMetrics]): v1.TaskMetrics = { + + val metrics = taskMetrics.getOrElse(TaskMetrics.empty) + val old = oldMetrics.getOrElse(newTaskMetrics(TaskMetrics.empty)) + + val shuffleWriteDelta = new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten, + 0L, + metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten) + + val shuffleReadDelta = new v1.ShuffleReadMetrics( + 0L, 0L, 0L, + metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead) + + val inputDelta = new v1.InputMetrics( + metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead) + + val outputDelta = new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten) + + new v1.TaskMetrics( + 0L, 0L, + metrics.executorRunTime - old.executorRunTime, + metrics.executorCpuTime - old.executorCpuTime, + 0L, 0L, 0L, + metrics.memoryBytesSpilled - old.memoryBytesSpilled, + metrics.diskBytesSpilled - old.diskBytesSpilled, + inputDelta, + outputDelta, + shuffleReadDelta, + shuffleWriteDelta) + } + // scalastyle:off argCount + + // The following are "copy methods" for API types that are often modified by the event handlers + // above. They allow copying from an existing instance, overriding specific fields with the + // values to be updated. Since the API types are immutable, this makes the event handlers cleaner + // since they don't have to deal with default and existing values. + + private def newExecutorSummary( + old: Option[v1.ExecutorSummary], + id: Option[String] = None, + hostPort: Option[String] = None, + isActive: Option[Boolean] = None, + rddBlocks: Option[Int] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + totalCores: Option[Int] = None, + maxTasks: Option[Int] = None, + activeTasks: Option[Int] = None, + failedTasks: Option[Int] = None, + completedTasks: Option[Int] = None, + totalTasks: Option[Int] = None, + totalDuration: Option[Long] = None, + totalGCTime: Option[Long] = None, + totalInputBytes: Option[Long] = None, + totalShuffleRead: Option[Long] = None, + totalShuffleWrite: Option[Long] = None, + isBlacklisted: Option[Boolean] = None, + maxMemory: Option[Long] = None, + executorLogs: Option[Map[String, String]] = None, + memoryMetrics: Option[Option[v1.MemoryMetrics]] = None) : v1.ExecutorSummary = { + new v1.ExecutorSummary( + value(id, old.map(_.id), UNKNOWN), + value(hostPort, old.map(_.hostPort), UNKNOWN), + value(isActive, old.map(_.isActive), false), + value(rddBlocks, old.map(_.rddBlocks), 0), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + value(totalCores, old.map(_.totalCores), 0), + value(maxTasks, old.map(_.maxTasks), 0), + value(activeTasks, old.map(_.activeTasks), 0), + value(failedTasks, old.map(_.failedTasks), 0), + value(completedTasks, old.map(_.completedTasks), 0), + value(totalTasks, old.map(_.totalTasks), 0), + value(totalDuration, old.map(_.totalDuration), 0L), + value(totalGCTime, old.map(_.totalGCTime), 0L), + value(totalInputBytes, old.map(_.totalInputBytes), 0L), + value(totalShuffleRead, old.map(_.totalShuffleRead), 0L), + value(totalShuffleWrite, old.map(_.totalShuffleWrite), 0L), + value(isBlacklisted, old.map(_.isBlacklisted), false), + value(maxMemory, old.map(_.maxMemory), 0L), + value(executorLogs, old.map(_.executorLogs), Map()), + option(memoryMetrics, old.map(_.memoryMetrics))) + } + + private def newJobData( + old: Option[v1.JobData], + jobId: Option[Int] = None, + name: Option[String] = None, + description: Option[Option[String]] = None, + submissionTime: Option[Option[Date]] = None, + completionTime: Option[Option[Date]] = None, + stageIds: Option[Seq[Int]] = None, + jobGroup: Option[Option[String]] = None, + status: Option[JobExecutionStatus] = None, + numTasks: Option[Int] = None, + numActiveTasks: Option[Int] = None, + numCompletedTasks: Option[Int] = None, + numSkippedTasks: Option[Int] = None, + numFailedTasks: Option[Int] = None, + numActiveStages: Option[Int] = None, + numCompletedStages: Option[Int] = None, + numSkippedStages: Option[Int] = None, + numFailedStages: Option[Int] = None): v1.JobData = { + new v1.JobData( + value(jobId, old.map(_.jobId), -1), + value(name, old.map(_.name), null), + option(description, old.map(_.description)), + option(submissionTime, old.map(_.submissionTime)), + option(completionTime, old.map(_.completionTime)), + value(stageIds, old.map(_.stageIds), Nil), + option(jobGroup, old.map(_.jobGroup)), + value(status, old.map(_.status), null), + value(numTasks, old.map(_.numTasks), 0), + value(numActiveTasks, old.map(_.numActiveTasks), 0), + value(numCompletedTasks, old.map(_.numCompletedTasks), 0), + value(numSkippedTasks, old.map(_.numSkippedTasks), 0), + value(numFailedTasks, old.map(_.numFailedTasks), 0), + value(numActiveStages, old.map(_.numActiveStages), 0), + value(numCompletedStages, old.map(_.numCompletedStages), 0), + value(numSkippedStages, old.map(_.numSkippedStages), 0), + value(numFailedStages, old.map(_.numFailedStages), 0)) + } + + private def newStageData( + old: Option[v1.StageData], + status: Option[v1.StageStatus] = None, + stageId: Option[Int] = None, + attemptId: Option[Int] = None, + numActiveTasks: Option[Int] = None, + numCompleteTasks: Option[Int] = None, + numFailedTasks: Option[Int] = None, + executorRunTime: Option[Long] = None, + executorCpuTime: Option[Long] = None, + submissionTime: Option[Option[Date]] = None, + firstTaskLaunchedTime: Option[Option[Date]] = None, + completionTime: Option[Option[Date]] = None, + inputBytes: Option[Long] = None, + inputRecords: Option[Long] = None, + outputBytes: Option[Long] = None, + outputRecords: Option[Long] = None, + shuffleReadBytes: Option[Long] = None, + shuffleReadRecords: Option[Long] = None, + shuffleWriteBytes: Option[Long] = None, + shuffleWriteRecords: Option[Long] = None, + memoryBytesSpilled: Option[Long] = None, + diskBytesSpilled: Option[Long] = None, + name: Option[String] = None, + details: Option[String] = None, + schedulingPool: Option[String] = None, + accumulatorUpdates: Option[Seq[v1.AccumulableInfo]] = None) + : v1.StageData = { + new v1.StageData( + value(status, old.map(_.status), v1.StageStatus.PENDING), + value(stageId, old.map(_.stageId), -1), + value(attemptId, old.map(_.attemptId), -1), + value(numActiveTasks, old.map(_.numActiveTasks), 0), + value(numCompleteTasks, old.map(_.numCompleteTasks), 0), + value(numFailedTasks, old.map(_.numFailedTasks), 0), + value(executorRunTime, old.map(_.executorRunTime), 0L), + value(executorCpuTime, old.map(_.executorCpuTime), 0L), + option(submissionTime, old.map(_.submissionTime)), + option(firstTaskLaunchedTime, old.map(_.firstTaskLaunchedTime)), + option(completionTime, old.map(_.completionTime)), + value(inputBytes, old.map(_.inputBytes), 0L), + value(inputRecords, old.map(_.inputRecords), 0L), + value(outputBytes, old.map(_.outputBytes), 0L), + value(outputRecords, old.map(_.outputRecords), 0L), + value(shuffleReadBytes, old.map(_.shuffleReadBytes), 0L), + value(shuffleReadRecords, old.map(_.shuffleReadRecords), 0L), + value(shuffleWriteBytes, old.map(_.shuffleWriteBytes), 0L), + value(shuffleWriteRecords, old.map(_.shuffleWriteRecords), 0L), + value(memoryBytesSpilled, old.map(_.memoryBytesSpilled), 0L), + value(diskBytesSpilled, old.map(_.diskBytesSpilled), 0L), + value(name, old.map(_.name), null), + value(details, old.map(_.details), null), + value(schedulingPool, old.map(_.schedulingPool), null), + value(accumulatorUpdates, old.map(_.accumulatorUpdates), Nil), + None, // Task list is always empty; it's stored separately. + None) // Executor summary us always empty; it's stored separately. + } + + private def newExecutorStageSummary( + old: Option[v1.ExecutorStageSummary], + taskTime: Option[Long] = None, + failedTasks: Option[Int] = None, + succeededTasks: Option[Int] = None, + inputBytes: Option[Long] = None, + outputBytes: Option[Long] = None, + shuffleRead: Option[Long] = None, + shuffleWrite: Option[Long] = None, + memoryBytesSpilled: Option[Long] = None, + diskBytesSpilled: Option[Long] = None): v1.ExecutorStageSummary = { + new v1.ExecutorStageSummary( + value(taskTime, old.map(_.taskTime), 0L), + value(failedTasks, old.map(_.failedTasks), 0), + value(succeededTasks, old.map(_.succeededTasks), 0), + value(inputBytes, old.map(_.inputBytes), 0L), + value(outputBytes, old.map(_.outputBytes), 0L), + value(shuffleRead, old.map(_.shuffleRead), 0L), + value(shuffleWrite, old.map(_.shuffleWrite), 0L), + value(memoryBytesSpilled, old.map(_.memoryBytesSpilled), 0L), + value(diskBytesSpilled, old.map(_.diskBytesSpilled), 0L)) + } + + private def newTaskData( + old: Option[v1.TaskData], + taskId: Option[Long] = None, + index: Option[Int] = None, + attempt: Option[Int] = None, + launchTime: Option[Date] = None, + duration: Option[Option[Long]] = None, + executorId: Option[String] = None, + host: Option[String] = None, + status: Option[String] = None, + taskLocality: Option[String] = None, + speculative: Option[Boolean] = None, + accumulatorUpdates: Option[Seq[v1.AccumulableInfo]] = None, + errorMessage: Option[Option[String]] = None, + taskMetrics: Option[Option[v1.TaskMetrics]] = None): v1.TaskData = { + new v1.TaskData( + value(taskId, old.map(_.taskId), -1L), + value(index, old.map(_.index), -1), + value(attempt, old.map(_.attempt), -1), + value(launchTime, old.map(_.launchTime), DEFAULT_DATE), + option(duration, old.map(_.duration)), + value(executorId, old.map(_.executorId), UNKNOWN), + value(host, old.map(_.host), UNKNOWN), + value(status, old.map(_.status), UNKNOWN), + value(taskLocality, old.map(_.taskLocality), UNKNOWN), + value(speculative, old.map(_.speculative), false), + value(accumulatorUpdates, old.map(_.accumulatorUpdates), Nil), + option(errorMessage, old.map(_.errorMessage)), + option(taskMetrics, old.map(_.taskMetrics))) + } + + private def newRDDStorageInfo( + old: Option[v1.RDDStorageInfo], + id: Option[Int] = None, + name: Option[String] = None, + numPartitions: Option[Int] = None, + numCachedPartitions: Option[Int] = None, + storageLevel: Option[String] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + dataDistribution: Option[Option[Seq[v1.RDDDataDistribution]]] = None, + partitions: Option[Option[Seq[v1.RDDPartitionInfo]]] = None): v1.RDDStorageInfo = { + new v1.RDDStorageInfo( + value(id, old.map(_.id), -1), + value(name, old.map(_.name), UNKNOWN), + value(numPartitions, old.map(_.numPartitions), 0), + value(numCachedPartitions, old.map(_.numCachedPartitions), 0), + value(storageLevel, old.map(_.storageLevel), StorageLevel.NONE.toString()), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + option(dataDistribution, old.map(_.dataDistribution)), + option(partitions, old.map(_.partitions))) + } + + private def newRDDDataDistribution( + old: Option[v1.RDDDataDistribution], + address: Option[String] = None, + memoryUsed: Option[Long] = None, + memoryRemaining: Option[Long] = None, + diskUsed: Option[Long] = None, + onHeapMemoryUsed: Option[Option[Long]] = None, + offHeapMemoryUsed: Option[Option[Long]] = None, + onHeapMemoryRemaining: Option[Option[Long]] = None, + offHeapMemoryRemaining: Option[Option[Long]] = None): v1.RDDDataDistribution = { + new v1.RDDDataDistribution( + value(address, old.map(_.address), UNKNOWN), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(memoryRemaining, old.map(_.memoryRemaining), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + option(onHeapMemoryUsed, old.map(_.onHeapMemoryUsed)), + option(offHeapMemoryUsed, old.map(_.offHeapMemoryUsed)), + option(onHeapMemoryRemaining, old.map(_.onHeapMemoryRemaining)), + option(offHeapMemoryRemaining, old.map(_.offHeapMemoryRemaining))) + } + + private def newRDDPartitionInfo( + old: Option[v1.RDDPartitionInfo], + blockName: Option[String] = None, + storageLevel: Option[String] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + executors: Option[Seq[String]] = None): v1.RDDPartitionInfo = { + new v1.RDDPartitionInfo( + value(blockName, old.map(_.blockName), UNKNOWN), + value(storageLevel, old.map(_.storageLevel), StorageLevel.NONE.toString()), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + value(executors, old.map(_.executors), Nil)) + } + + // scalastyle:on argCount + + private def newAccumulableInfo(acc: AccumulableInfo): v1.AccumulableInfo = { + new v1.AccumulableInfo( + acc.id, + acc.name.orNull, + acc.update.map(_.toString()), + acc.value.map(_.toString()).orNull) + } + + private def newTaskMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + new v1.TaskMetrics( + metrics.executorDeserializeTime, + metrics.executorDeserializeCpuTime, + metrics.executorRunTime, + metrics.executorCpuTime, + metrics.resultSize, + metrics.jvmGCTime, + metrics.resultSerializationTime, + metrics.memoryBytesSpilled, + metrics.diskBytesSpilled, + new v1.InputMetrics( + metrics.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead), + new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten), + new v1.ShuffleReadMetrics( + metrics.shuffleReadMetrics.remoteBlocksFetched, + metrics.shuffleReadMetrics.localBlocksFetched, + metrics.shuffleReadMetrics.fetchWaitTime, + metrics.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead), + new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten, + metrics.shuffleWriteMetrics.writeTime, + metrics.shuffleWriteMetrics.recordsWritten)) + } + + private def newJobDataWrapper( + old: Option[JobDataWrapper], + info: v1.JobData, + initialStages: Option[Set[(Int, Int)]] = None, + completedStages: Option[Set[Int]] = None): JobDataWrapper = { + new JobDataWrapper(info, + value(initialStages, old.map(_.initialStages), Set()), + value(completedStages, old.map(_.completedStages), Set())) + } + + private def newStageDataWrapper( + old: Option[StageDataWrapper], + info: v1.StageData, + jobIds: Option[Set[Int]] = None): StageDataWrapper = { + new StageDataWrapper(info, jobIds.orElse(old.map(_.jobIds)).getOrElse(Set())) + } + + private def activeJobs(stage: StageDataWrapper): Set[Int] = { + stage.jobIds.filter(activeJobs.contains(_)) + } + +} + +private[spark] object AppStateListener { + + val CURRENT_VERSION = 1L + val DEFAULT_DATE = new Date(-1) + val UNKNOWN = "" + +} diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala new file mode 100644 index 0000000000000..b1ac19adfc003 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File + +import scala.annotation.meta.getter +import scala.language.implicitConversions +import scala.reflect.{classTag, ClassTag} + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.kvstore._ +import org.apache.spark.status.api.v1.JacksonMessageWriter + +/** A mix-in trait with helper methods to read and update data from a KVStore. */ +private[spark] trait KVUtils { + + protected val kvstore: KVStore + + /** Helper method for choosing among a new, old or default value. */ + protected def value[T](newv: Option[T], oldv: Option[T], dflt: T): T = { + newv.orElse(oldv).getOrElse(dflt) + } + + /** Helper method for choosing between an optional new or old value. */ + protected def option[T](newv: Option[Option[T]], oldv: Option[Option[T]]): Option[T] = { + newv.getOrElse(oldv.getOrElse(None)) + } + + /** Helper method for reading a value from a KVStore, and return an Option. */ + protected def read[T: ClassTag](key: Any): Option[T] = { + try { + Some(kvstore.read(classTag[T].runtimeClass, key).asInstanceOf[T]) + } catch { + case _: NoSuchElementException => None + } + } + + /** Helper method for updating a value read from a KVStore. */ + protected def update[T: ClassTag](key: Any)(fn: Option[T] => T): Unit = { + val updated = fn(read(key)) + kvstore.write(updated) + } + + /** Utility conversion method used to copy immutable classes. */ + protected implicit def anyToOption[T](o: T): Option[T] = Option(o) + +} + +private[spark] object KVUtils extends Logging { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + /** + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. + */ + class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat) + + } + + /** + * Open or create a LevelDB store. + * + * @param path Location of the store. + * @param metadata Metadata value to compare to the data in the store. If the store does not + * contain any metadata (e.g. it's a new store), this value is written as + * the store's metadata. + */ + def open[M: ClassTag](path: File, metadata: M): LevelDB = { + require(metadata != null, "Metadata is required.") + + val db = new LevelDB(path, new KVStoreScalaSerializer()) + val dbMeta = db.getMetadata(classTag[M].runtimeClass) + if (dbMeta == null) { + db.setMetadata(metadata) + } else if (dbMeta != metadata) { + db.close() + throw new MetadataMismatchException() + } + + db + } + + class MetadataMismatchException extends Exception + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 1818935392eb3..b81cef207d20e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -69,7 +69,7 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) }.toMap) } else { None } @@ -86,7 +86,7 @@ private[v1] object AllStagesResource { memoryBytesSpilled = summary.memoryBytesSpilled, diskBytesSpilled = summary.diskBytesSpilled ) - }) + }.toMap) } else { None } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index d66117410f2c5..25077d2669d9f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -16,9 +16,10 @@ */ package org.apache.spark.status.api.v1 +import java.lang.{Long => JLong} import java.util.Date -import scala.collection.Map +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus @@ -131,9 +132,13 @@ class RDDDataDistribution private[spark]( val memoryUsed: Long, val memoryRemaining: Long, val diskUsed: Long, + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryRemaining: Option[Long]) class RDDPartitionInfo private[spark]( @@ -181,7 +186,8 @@ class TaskData private[spark]( val index: Int, val attempt: Int, val launchTime: Date, - val duration: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[JLong]) + val duration: Option[Long], val executorId: String, val host: String, val status: String, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala new file mode 100644 index 0000000000000..d4b980f5dbb07 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.kvstore.KVIndex +import org.apache.spark.status.api.v1._ +import org.apache.spark.status.KVUtils._ + +private[spark] class AppStatusStoreMetadata( + val version: Long) + +private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { + + @JsonIgnore @KVIndex + def id: String = info.id + +} + +private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { + + @JsonIgnore @KVIndex + def id: String = info.id + + @JsonIgnore @KVIndex("host") + def host: String = info.hostPort.split(":")(0) + +} + +/** + * Keep track of the existing stages when the job was submitted, and those that were + * completed during the job's execution. This allows a more accurate acounting of how + * many tasks were skipped for the job. + */ +private[spark] class JobDataWrapper( + val info: JobData, + val initialStages: Set[(Int, Int)], + val completedStages: Set[Int]) { + + @JsonIgnore @KVIndex + def id: Int = info.jobId + +} + +private[spark] class StageDataWrapper( + val info: StageData, + val jobIds: Set[Int]) { + + @JsonIgnore @KVIndex + def id: Array[Int] = Array(info.stageId, info.attemptId) + +} + +private[spark] class TaskDataWrapper(val info: TaskData) { + + @JsonIgnore @KVIndex + def id: Long = info.taskId + +} + +private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { + + @JsonIgnore @KVIndex + def id: Int = info.id + + @JsonIgnore @KVIndex("cached") + def cached: Boolean = info.numCachedPartitions > 0 + +} + +private[spark] class ExecutorStageSummaryWrapper( + val stageId: Int, + val stageAttemptId: Int, + val executorId: String, + val info: ExecutorStageSummary) { + + @JsonIgnore @KVIndex + def id: Array[Any] = Array(stageId, stageAttemptId, executorId) + + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala new file mode 100644 index 0000000000000..8da3451f62b01 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala @@ -0,0 +1,694 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, Date, Properties} + +import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} + +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.kvstore._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.util.Utils + +class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { + + private var time: Long = _ + private var testDir: File = _ + private var store: KVStore = _ + + before { + time = 0L + testDir = Utils.createTempDir() + store = KVUtils.open(testDir, getClass().getName()) + } + + after { + store.close() + Utils.deleteRecursively(testDir) + } + + test("scheduler events") { + val listener = new AppStateListener(store) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(time)) + assert(attempt.lastUpdated === new Date(time)) + assert(attempt.endTime === AppStateListener.DEFAULT_DATE) + assert(attempt.sparkUser === "user") + assert(!attempt.completed) + } + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + + execIds.foreach { id => + listener.onExecutorAdded(SparkListenerExecutorAdded(time, id, + new ExecutorInfo(s"$id.example.com", 1, Map()))) + } + + execIds.foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(exec.info.hostPort === s"$id.example.com") + assert(exec.info.isActive) + } + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + val stages = Seq( + new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2")) + + val stageProps = new Properties() + stageProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + stageProps.setProperty("spark.scheduler.pool", "schedPool") + + listener.onJobStart(SparkListenerJobStart(1, time, stages, null)) + + check[JobDataWrapper](1) { job => + assert(job.info.jobId === 1) + assert(job.info.name === stages.last.name) + assert(job.info.description === Some(stages.last.details)) + assert(job.info.status === JobExecutionStatus.RUNNING) + assert(job.info.submissionTime === Some(new Date(time))) + } + + stages.foreach { info => + check[StageDataWrapper](key(info)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.jobIds === Set(1)) + } + } + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get))) + } + + // Start tasks from stage 1 + time += 1 + var _taskIdTracker = -1L + def nextTaskId(): Long = { + _taskIdTracker += 1 + _taskIdTracker + } + + def createTasks(count: Int, time: Long): Seq[TaskInfo] = { + (1 to count).map { id => + val exec = execIds(id.toInt % execIds.length) + val taskId = nextTaskId() + new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, id % 2 == 0) + } + } + + val s1Tasks = createTasks(4, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task)) + } + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numActiveTasks === s1Tasks.size) + assert(stage.info.firstTaskLaunchedTime === Some(new Date(s1Tasks.head.launchTime))) + } + + s1Tasks.foreach { task => + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.taskId === task.taskId) + assert(wrapper.info.index === task.index) + assert(wrapper.info.attempt === task.attemptNumber) + assert(wrapper.info.launchTime === new Date(task.launchTime)) + assert(wrapper.info.executorId === task.executorId) + assert(wrapper.info.host === task.host) + assert(wrapper.info.status === task.status) + assert(wrapper.info.taskLocality === task.taskLocality.toString()) + assert(wrapper.info.speculative === task.speculative) + } + } + + // Send executor metrics update. Only update one metric to avoid a lot of boilerplate code. + s1Tasks.foreach { task => + val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED), + Some(1L), None, true, false, None) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + task.executorId, + Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum))))) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.memoryBytesSpilled === s1Tasks.size) + + val it = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stage.id) + .closeableIterator() + try { + val execs = it.asScala.takeWhile { v => Arrays.equals(v.stage, stage.id) }.toList + assert(execs.size > 0) + execs.foreach { exec => + assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) + } + } finally { + it.close() + } + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.finishTime = time + s1Tasks.head.failed = true + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", TaskResultLost, s1Tasks.head, null)) + + time += 1 + val reattempt = { + val orig = s1Tasks.head + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, + reattempt)) + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1) + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === s1Tasks.size) + } + + check[TaskDataWrapper](s1Tasks.head.taskId) { task => + assert(task.info.status === s1Tasks.head.status) + assert(task.info.duration === Some(s1Tasks.head.duration)) + assert(task.info.errorMessage == Some(TaskResultLost.toErrorString)) + } + + check[TaskDataWrapper](reattempt.taskId) { task => + assert(task.info.index === s1Tasks.head.index) + assert(task.info.attempt === reattempt.attemptNumber) + } + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + + time += 1 + pending.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", Success, task, s1Metrics)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === 0) + assert(job.info.numCompletedTasks === pending.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + pending.foreach { task => + check[TaskDataWrapper](task.taskId) { task => + assert(task.info.errorMessage === None) + assert(task.info.taskMetrics.get.executorCpuTime === 2L) + } + } + + assert(store.count(classOf[TaskDataWrapper]) === pending.size + 1) + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numCompletedStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.last.submissionTime.get))) + } + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task)) + } + + time += 1 + s2Tasks.foreach { task => + task.finishTime = time + task.failed = true + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId, + "taskType", TaskResultLost, task, null)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1 + s2Tasks.size) + assert(job.info.numActiveTasks === 0) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 1) + assert(job.info.numFailedStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.FAILED) + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 3) + + val newS2Tasks = createTasks(4, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success, + task, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numFailedStages === 1) + assert(job.info.numCompletedStages === 2) + } + + check[StageDataWrapper](key(newS2)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === newS2Tasks.size) + } + + // End job. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + } + + // Submit a second job that re-uses stage 1 and stage 2. Stage 1 won't be re-run, but + // stage 2 will. This should cause stage 1 to be "skipped" in the new job, and the + // stage 2 re-execution should not change the stats of the already finished job. + time += 1 + listener.onJobStart(SparkListenerJobStart(2, time, Seq(stages.head, newS2), null)) + assert(store.count(classOf[JobDataWrapper]) === 2) + + // The new stage attempt is submitted by the DAGScheduler in a real app, so it's not yet + // known by the listener. Make sure that's the case. + val j2s2 = new StageInfo(newS2.stageId, newS2.attemptId + 1, newS2.name, newS2.numTasks, + newS2.rddInfos, newS2.parentIds, newS2.details, newS2.taskMetrics) + intercept[NoSuchElementException] { + check[StageDataWrapper](key(j2s2)) { _ => () } + } + + time += 1 + j2s2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(j2s2, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 4) + + time += 1 + val j2s2Tasks = createTasks(4, time) + + j2s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(j2s2.stageId, j2s2.attemptId, task)) + } + + time += 1 + j2s2Tasks.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(j2s2.stageId, j2s2.attemptId, "taskType", Success, + task, null)) + } + + time += 1 + j2s2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(j2s2)) + + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 2) + assert(job.info.numCompletedTasks === s1Tasks.size + s2Tasks.size) + } + + check[JobDataWrapper](2) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + assert(job.info.numCompletedStages === 1) + assert(job.info.numCompletedTasks === j2s2Tasks.size) + assert(job.info.numSkippedStages === 1) + assert(job.info.numSkippedTasks === s1Tasks.size) + } + + // Blacklist an executor. + time += 1 + listener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "1", 42)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "1")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Blacklist a node. + time += 1 + listener.onNodeBlacklisted(SparkListenerNodeBlacklisted(time, "1.example.com", 2)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onNodeUnblacklisted(SparkListenerNodeUnblacklisted(time, "1.example.com")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Stop executors. + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test")) + + Seq("1", "2").foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(!exec.info.isActive) + } + } + + // End the application. + listener.onApplicationEnd(SparkListenerApplicationEnd(42L)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(1L)) + assert(attempt.lastUpdated === new Date(42L)) + assert(attempt.endTime === new Date(42L)) + assert(attempt.duration === 41L) + assert(attempt.sparkUser === "user") + assert(attempt.completed) + } + } + + test("storage events") { + val listener = new AppStateListener(store) + val maxMemory = 42L + + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "1.example.com", 42) + val bm2 = BlockManagerId("2", "2.example.com", 84) + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map()))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory)) + check[ExecutorSummaryWrapper](bm.executorId) { exec => + assert(exec.info.maxMemory === maxMemory) + } + } + + val rdd1b1 = RDDBlockId(1, 1) + val level = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDD is recorded. + val rddInfo = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rddInfo), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.name === rddInfo.name) + assert(wrapper.info.numPartitions === rddInfo.numPartitions) + assert(wrapper.info.storageLevel === rddInfo.storageLevel.description) + } + + // Add partition 1 replicated on two block managers. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 1L) + assert(wrapper.info.diskUsed === 1L) + + assert(wrapper.info.dataDistribution.isDefined) + assert(wrapper.info.dataDistribution.get.size === 1) + + val dist = wrapper.info.dataDistribution.get.head + assert(dist.address === bm1.hostPort) + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + assert(wrapper.info.partitions.isDefined) + assert(wrapper.info.partitions.get.size === 1) + + val part = wrapper.info.partitions.get.head + assert(part.blockName === rdd1b1.name) + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 2L) + assert(wrapper.info.diskUsed === 2L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 1L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get(0) + assert(part.memoryUsed === 2L) + assert(part.diskUsed === 2L) + assert(part.executors === Seq(bm1.executorId, bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + // Add a second partition only to bm 1. + val rdd1b2 = RDDBlockId(1, 2) + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b2, level, + 3L, 3L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 5L) + assert(wrapper.info.diskUsed === 5L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 4L) + assert(dist.diskUsed === 4L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b2.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 3L) + assert(part.diskUsed === 3L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 2L) + assert(exec.info.memoryUsed === 4L) + assert(exec.info.diskUsed === 4L) + } + + // Remove block 1 from bm 1. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 4L) + assert(wrapper.info.diskUsed === 4L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 3L) + assert(dist.diskUsed === 3L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 3L) + assert(exec.info.diskUsed === 3L) + } + + // Remove block 2 from bm 2. This should leave only block 2 info in the store. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 3L) + assert(wrapper.info.diskUsed === 3L) + assert(wrapper.info.dataDistribution.get.size === 1L) + assert(wrapper.info.partitions.get.size === 1L) + assert(wrapper.info.partitions.get(0).blockName === rdd1b2.name) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 0L) + assert(exec.info.memoryUsed === 0L) + assert(exec.info.diskUsed === 0L) + } + + // Unpersist RDD1. + listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId)) + intercept[NoSuchElementException] { + check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () } + } + + } + + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) + + private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { + val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T] + fn(value) + } + +} diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 656c7614c2bfd..d7932727b4bd5 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -93,7 +93,7 @@ This file is divided into 3 sections: - +