diff --git a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java index b40c7950d1d11..304c48e6cbb96 100644 --- a/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java @@ -69,7 +69,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception { this.types = new ConcurrentHashMap<>(); Options options = new Options(); - options.createIfMissing(!path.exists()); + options.createIfMissing(true); this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); byte[] versionData = db().get(STORE_VERSION_KEY); diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 3eee5d809c470..e2273e530952c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -43,6 +43,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ +import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -699,18 +700,6 @@ private[history] object FsHistoryProvider { private val CURRENT_VERSION = 1L } -/** - * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as - * the API serializer. - */ -private class KVStoreScalaSerializer extends KVStoreSerializer { - - mapper.registerModule(DefaultScalaModule) - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) - mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) - -} - case class KVStoreMetadata( val version: Long, val logDir: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index affaff86e3139..9ca07e3d63271 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -19,16 +19,10 @@ package org.apache.spark.deploy.history import java.util.concurrent.TimeUnit -import scala.annotation.meta.getter - import org.apache.spark.internal.config.ConfigBuilder -import org.apache.spark.kvstore.KVIndex private[spark] object config { - /** Use this to annotate constructor params to be used as KVStore indices. */ - type KVIndexParam = KVIndex @getter - val DEFAULT_LOG_DIR = "file:/tmp/spark-events" val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala new file mode 100644 index 0000000000000..61af0ff1aba2b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala @@ -0,0 +1,1090 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.lang.{Long => JLong} +import java.util.Date + +import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.kvstore.KVStore +import org.apache.spark.scheduler._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.ui.SparkUI + +/** + * A Spark listener that writes application information to a data store. The types written to the + * store are defined in the `storeTypes.scala` file and are based on the public REST API. + * + * TODO (M4): + * - Need to add state / information needed by the UI that is currently not in the API. + * - Need mechanism to insert this into SparkContext and be called when SparkContext is + * closed (new plugin interface?) + * - Need to close store in live UI after SparkContext closes. + * - Cache active jobs / stages / other interesting things in memory to make it faster + * to update them + * - Flush data to the store in a separate thread (to avoid stalling the listener bus). + * + * TODO (future): + * - to enable incremental parsing of event logs, all state in this class needs to be serialized + * to the underlying store and loaded when the class is instantiated. That could potentially + * be written to the AppStatusStoreMetadata object. + */ +private class AppStateListener(override protected val kvstore: KVStore) extends SparkListener + with KVUtils { + + import AppStateListener._ + + private var appId: String = null + private var activeJobs: Set[Int] = Set() + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { + assert(event.appId.isDefined, "Application without IDs are not supported.") + this.appId = event.appId.get + + val attempt = new v1.ApplicationAttemptInfo( + event.appAttemptId, + new Date(event.time), + DEFAULT_DATE, + new Date(event.time), + -1L, + event.sparkUser, + false) + + val app = new v1.ApplicationInfo( + appId, + event.appName, + None, + None, + None, + None, + Seq(attempt)) + + val stored = new ApplicationInfoWrapper(app) + kvstore.write(stored) + } + + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { + // TODO + } + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + update[ApplicationInfoWrapper](appId) { wrapper => + val app = wrapper.get.info + val attempt = app.attempts.head + + val newAttempt = new v1.ApplicationAttemptInfo( + attempt.attemptId, + attempt.startTime, + new Date(event.time), + new Date(event.time), + event.time - attempt.startTime.getTime(), + attempt.sparkUser, + true) + + val newApp = new v1.ApplicationInfo( + app.id, app.name, app.coresGranted, app.maxCores, app.coresPerExecutor, + app.memoryPerExecutorMB, Seq(newAttempt)) + + new ApplicationInfoWrapper(newApp) + } + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + // This needs to be an update in case an executor re-registers after the driver has + // marked it as "dead". + updateExecutorSummary(event.executorId) { exec => + val newInfo = newExecutorSummary(exec.info, + hostPort = event.executorInfo.executorHost, + isActive = true, + totalCores = event.executorInfo.totalCores, + executorLogs = event.executorInfo.logUrlMap) + new ExecutorSummaryWrapper(newInfo) + } + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + updateExecutorSummary(event.executorId) { exec => + val newInfo = newExecutorSummary(exec.info, + isActive = false) + new ExecutorSummaryWrapper(newInfo) + } + } + + override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { + updateBlackListStatus(event.executorId, true) + } + + override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { + updateBlackListStatus(event.executorId, false) + } + + override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { + updateNodeBlackList(event.hostId, true) + } + + override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { + updateNodeBlackList(event.hostId, false) + } + + private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { + updateExecutorSummary(execId) { exec => + val newInfo = newExecutorSummary(exec.info, + isBlacklisted = blacklisted) + new ExecutorSummaryWrapper(newInfo) + } + } + + private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = { + // Implicitly (un)blacklist every executor associated with the node. + val it = kvstore.view(classOf[ExecutorSummaryWrapper]).index("host").closeableIterator() + try { + it.asScala + .takeWhile(_.host == host) + .foreach { exec => updateBlackListStatus(exec.id, blacklisted) } + } finally { + it.close() + } + } + + override def onJobStart(event: SparkListenerJobStart): Unit = { + // Compute (a potential underestimate of) the number of tasks that will be run by this job. + // This may be an underestimate because the job start event references all of the result + // stages' transitive stage dependencies, but some of these stages might be skipped if their + // output is available from earlier runs. + // See https://github.com/apache/spark/pull/3009 for a more extensive discussion. + val numTasks = { + val missingStages = event.stageInfos.filter(_.completionTime.isEmpty) + missingStages.map(_.numTasks).sum + } + + val lastStageInfo = event.stageInfos.lastOption + val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") + + val jobGroup = Option(event.properties) + .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } + + val info = newJobData(None, + jobId = event.jobId, + name = lastStageName, + description = lastStageInfo.map(_.details), + submissionTime = Option(event.time).filter(_ >= 0).map(new Date(_)), + stageIds = event.stageIds, + jobGroup = jobGroup, + status = JobExecutionStatus.RUNNING, + numTasks = numTasks) + + val initialStages = event.stageInfos.map { stage => + // A new job submission may re-use an existing stage, so this code needs to do an update + // instead of just a write. + updateStageData(stage.stageId, stage.attemptId) { wrapper => + val jobIds = wrapper.jobIds ++ Set(event.jobId) + newStageDataWrapper(wrapper, wrapper.info, jobIds = jobIds) + } + (stage.stageId, stage.attemptId) + }.toSet + + val stored = newJobDataWrapper(None, info, initialStages = initialStages) + kvstore.write(stored) + + activeJobs += event.jobId + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + updateJobData(event.jobId) { job => + val status = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case JobFailed(_) => JobExecutionStatus.FAILED + } + + val skippedStages = job.info.stageIds.toSet &~ job.completedStages + + // Calculate the number of skipped tasks based on the number of completed tasks of + // skipped stage attempts. + val skippedAttempts = job.initialStages.filter { case (stageId, _) => + skippedStages.contains(stageId) + } + val skippedTasks = skippedAttempts.flatMap { case (id, attempt) => + read[StageDataWrapper](Array(id, attempt)).map(_.info.numCompleteTasks) + }.sum + + val newInfo = newJobData(job.info, + jobId = event.jobId, + completionTime = Some(new Date(event.time)), + status = status, + numSkippedStages = skippedStages.size, + numSkippedTasks = skippedTasks) + newJobDataWrapper(job, newInfo) + } + + activeJobs -= event.jobId + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + // New attempts are created based on the previous attempt's data - mainly to track which jobs + // depend on the stage. Attempt IDs start at 0, so this check takes care of retrieving the + // correct previous attempt from the store. + val previousAttempt = if (event.stageInfo.attemptId > 0) { + event.stageInfo.attemptId - 1 + } else { + event.stageInfo.attemptId + } + + updateStageData(event.stageInfo.stageId, previousAttempt) { stage => + val poolName = Option(event.properties).flatMap { p => + Option(p.getProperty("spark.scheduler.pool")) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) + + val description = Option(event.properties).flatMap { p => + Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + + // Update jobs to add active stage and remove the stage from the completed list. + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveStages = wrapper.info.numActiveStages + 1) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + val newInfo = newStageData( + stage.info, + status = v1.StageStatus.ACTIVE, + stageId = event.stageInfo.stageId, + attemptId = event.stageInfo.attemptId, + submissionTime = event.stageInfo.submissionTime.map(new Date(_)), + details = description, + schedulingPool = poolName) + newStageDataWrapper(stage, newInfo) + } + + // Update RDDs with the stage name. + event.stageInfo.rddInfos.foreach { info => + updateRDDStorageInfo(info.id) { rdd => + val updated = newRDDStorageInfo(rdd.info, + name = info.name, + numPartitions = info.numPartitions, + storageLevel = info.storageLevel.description) + new RDDStorageInfoWrapper(updated) + } + } + } + + override def onTaskStart(event: SparkListenerTaskStart): Unit = { + if (event.taskInfo != null) { + val task = newTaskData(None, + taskId = event.taskInfo.taskId, + index = event.taskInfo.index, + attempt = event.taskInfo.attemptNumber, + launchTime = new Date(event.taskInfo.launchTime), + executorId = event.taskInfo.executorId, + host = event.taskInfo.host, + status = event.taskInfo.status, + taskLocality = event.taskInfo.taskLocality.toString(), + speculative = event.taskInfo.speculative) + kvstore.write(new TaskDataWrapper(task)) + } + + updateStageData(event.stageId, event.stageAttemptId) { stage => + // Update jobs to add active stage and remove the stage from the completed list. + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveTasks = wrapper.info.numActiveTasks + 1) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + val firstTask = stage.info.firstTaskLaunchedTime.getOrElse( + new Date(event.taskInfo.launchTime)) + + val newInfo = newStageData( + stage.info, + stageId = event.stageId, + attemptId = event.stageAttemptId, + firstTaskLaunchedTime = Some(firstTask), + numActiveTasks = stage.info.numActiveTasks + 1) + newStageDataWrapper(stage, newInfo) + } + } + + override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { + // Nothing to do here. + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + val info = event.taskInfo + // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task + // completion event is for. Let's just drop it here. This means we might have some speculation + // tasks on the web ui that are never marked as complete. + if (info == null || event.stageAttemptId == -1) { + return + } + + var metricsDelta: v1.TaskMetrics = null + + updateTaskData(event.taskInfo.taskId) { task => + val errorMessage = event.reason match { + case Success => + None + case k: TaskKilled => + Some(k.reason) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates + Some(e.toErrorString) + case e: TaskFailedReason => // All other failure cases + Some(e.toErrorString) + } + + metricsDelta = calculateMetricsDelta(Option(event.taskMetrics), task.info.taskMetrics) + + val newInfo = newTaskData( + task.info, + taskId = event.taskInfo.taskId, + duration = Some(event.taskInfo.duration), + status = event.taskInfo.status, + errorMessage = errorMessage, + taskMetrics = Option(event.taskMetrics).map(newTaskMetrics)) + new TaskDataWrapper(newInfo) + } + + val (completedDelta, failedDelta) = event.reason match { + case Success => + (1, 0) + case _: TaskKilled => + (0, 0) + case _ => + (0, 1) + } + + updateStageData(event.stageId, event.stageAttemptId) { stage => + val accumulables = info.accumulables.map(newAccumulableInfo).toSeq + + val oldStage = stage.info + val newInfo = newStageData( + oldStage, + numActiveTasks = oldStage.numActiveTasks - 1, + numCompleteTasks = oldStage.numCompleteTasks + completedDelta, + numFailedTasks = oldStage.numFailedTasks + failedDelta, + accumulatorUpdates = accumulables, + inputBytes = oldStage.inputBytes + metricsDelta.inputMetrics.bytesRead, + inputRecords = oldStage.inputRecords + metricsDelta.inputMetrics.recordsRead, + outputBytes = oldStage.outputBytes + metricsDelta.outputMetrics.bytesWritten, + outputRecords = oldStage.outputRecords + metricsDelta.outputMetrics.recordsWritten, + shuffleReadBytes = oldStage.shuffleReadBytes + + metricsDelta.shuffleReadMetrics.localBytesRead + + metricsDelta.shuffleReadMetrics.remoteBytesRead, + shuffleReadRecords = oldStage.shuffleReadRecords + + metricsDelta.shuffleReadMetrics.recordsRead, + shuffleWriteBytes = oldStage.shuffleWriteBytes + + metricsDelta.shuffleWriteMetrics.bytesWritten, + shuffleWriteRecords = oldStage.shuffleWriteRecords + + metricsDelta.shuffleWriteMetrics.recordsWritten, + memoryBytesSpilled = oldStage.memoryBytesSpilled + metricsDelta.memoryBytesSpilled, + diskBytesSpilled = oldStage.diskBytesSpilled + metricsDelta.diskBytesSpilled) + + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveTasks = wrapper.info.numActiveTasks - 1, + numCompletedTasks = wrapper.info.numCompletedTasks + completedDelta, + numFailedTasks = wrapper.info.numFailedTasks + failedDelta) + newJobDataWrapper(wrapper, newJobInfo) + } + } + + newStageDataWrapper(stage, newInfo) + } + + updateExecutorStageSummary( + event.stageId, + event.stageAttemptId, + event.taskInfo.executorId) { exec => + newExecutorStageSummary( + exec, + failedTasks = exec.failedTasks + failedDelta, + inputBytes = exec.inputBytes + metricsDelta.inputMetrics.bytesRead, + outputBytes = exec.outputBytes + metricsDelta.outputMetrics.bytesWritten, + shuffleRead = exec.shuffleRead + metricsDelta.shuffleReadMetrics.localBytesRead + + metricsDelta.shuffleReadMetrics.remoteBytesRead, + shuffleWrite = exec.shuffleWrite + metricsDelta.shuffleWriteMetrics.bytesWritten, + memoryBytesSpilled = exec.memoryBytesSpilled + metricsDelta.memoryBytesSpilled, + diskBytesSpilled = exec.diskBytesSpilled + metricsDelta.diskBytesSpilled) + } + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val info = event.stageInfo + updateStageData(info.stageId, info.attemptId) { stage => + val status = event.stageInfo.failureReason match { + case Some(_) => v1.StageStatus.FAILED + case None => v1.StageStatus.COMPLETE + } + val newStage = newStageData( + stage.info, + status = status, + completionTime = info.completionTime.map(new Date(_))) + + val (completedDelta, failedDelta) = if (status == v1.StageStatus.COMPLETE) { + (1, 0) + } else { + (0, 1) + } + + activeJobs(stage).foreach { job => + updateJobData(job) { wrapper => + val newJobInfo = newJobData( + wrapper.info, + jobId = job, + numActiveStages = wrapper.info.numActiveStages - 1, + numCompletedStages = wrapper.info.numCompletedStages + completedDelta, + numFailedStages = wrapper.info.numFailedStages + failedDelta) + val stages = wrapper.completedStages + info.stageId + newJobDataWrapper(wrapper, newJobInfo, completedStages = stages) + } + } + + newStageDataWrapper(stage, newStage) + } + } + + override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { + updateExecutorSummary(event.blockManagerId.executorId) { exec => + val updated = newExecutorSummary( + exec.info, + maxMemory = event.maxMem) + new ExecutorSummaryWrapper(updated) + } + } + + override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = { + // Nothing to do here. Covered by onExecutorRemoved. + } + + override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = { + kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId) + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) => + val task = read[TaskDataWrapper](taskId) + val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) + val delta = calculateMetricsDelta(Some(metrics), task.flatMap(_.info.taskMetrics)) + + updateStageData(sid, sAttempt) { stage => + val oldStage = stage.info + val updatedStage = newStageData( + oldStage, + inputBytes = oldStage.inputBytes + delta.inputMetrics.bytesRead, + inputRecords = oldStage.inputRecords + delta.inputMetrics.recordsRead, + outputBytes = oldStage.outputBytes + delta.outputMetrics.bytesWritten, + outputRecords = oldStage.outputRecords + delta.outputMetrics.recordsWritten, + shuffleReadBytes = oldStage.shuffleReadBytes + + delta.shuffleReadMetrics.localBytesRead + + delta.shuffleReadMetrics.remoteBytesRead, + shuffleReadRecords = oldStage.shuffleReadRecords + + delta.shuffleReadMetrics.recordsRead, + shuffleWriteBytes = oldStage.shuffleWriteBytes + + delta.shuffleWriteMetrics.bytesWritten, + shuffleWriteRecords = oldStage.shuffleWriteRecords + + delta.shuffleWriteMetrics.recordsWritten, + memoryBytesSpilled = oldStage.memoryBytesSpilled + delta.memoryBytesSpilled, + diskBytesSpilled = oldStage.diskBytesSpilled + delta.diskBytesSpilled) + newStageDataWrapper(stage, updatedStage) + } + + updateExecutorStageSummary(sid, sAttempt, event.execId) { exec => + newExecutorStageSummary( + exec, + inputBytes = exec.inputBytes + delta.inputMetrics.bytesRead, + outputBytes = exec.outputBytes + delta.outputMetrics.bytesWritten, + shuffleRead = exec.shuffleRead + delta.shuffleReadMetrics.localBytesRead + + delta.shuffleReadMetrics.remoteBytesRead, + shuffleWrite = exec.shuffleWrite + delta.shuffleWriteMetrics.bytesWritten, + memoryBytesSpilled = exec.memoryBytesSpilled + delta.memoryBytesSpilled, + diskBytesSpilled = exec.diskBytesSpilled + delta.diskBytesSpilled) + } + } + } + + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + event.blockUpdatedInfo.blockId match { + case block: RDDBlockId => updateRDDBlock(event, block) + case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + // TODO: SQL, Streaming. + } + + private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { + val executorId = event.blockUpdatedInfo.blockManagerId.executorId + + // Whether values are being added to or removed from the existing accounting. + val storageLevel = event.blockUpdatedInfo.storageLevel + val diskMult = if (storageLevel.useDisk) 1 else -1 + val memoryMult = if (storageLevel.useMemory) 1 else -1 + + // Function to apply a delta to a value, but ensure that it doesn't go negative. + def newValue(old: Long, delta: Long): Long = math.max(0, old + delta) + + // If the storage level is NONE, then don't update the storage level of existing information. + val updatedStorageLevel = if (storageLevel.useMemory || storageLevel.useDisk) { + Some(storageLevel.description) + } else { + None + } + + // We need information about the executor to update some memory accounting values in the + // RDD info, so read that beforehand. + val executorInfo = read[ExecutorSummaryWrapper](executorId).getOrElse( + new ExecutorSummaryWrapper(newExecutorSummary(None, id = executorId))) + + var rddBlocksDelta = 0 + + // Update the block entry in the RDD info, keeping track of the deltas above so that we + // can update the executor information too. + updateRDDStorageInfo(block.rddId) { rdd => + val (maybeOldBlock, others) = rdd.info.partitions.getOrElse(Nil) + .partition(_.blockName == block.name) match { + case (old, others) => + (old.headOption, others) + } + + val oldBlock = maybeOldBlock.getOrElse { + newRDDPartitionInfo(None, blockName = block.name) + } + + val newExecutors = if (storageLevel.useDisk || storageLevel.useMemory) { + if (!oldBlock.executors.contains(executorId)) { + rddBlocksDelta = 1 + } + oldBlock.executors.toSet + executorId + } else { + rddBlocksDelta = -1 + oldBlock.executors.toSet - executorId + } + + // Only update the block if it's still stored in some executor, otherwise get rid of it. + val newBlocks = if (newExecutors.nonEmpty) { + val newBlock = newRDDPartitionInfo( + oldBlock, + storageLevel = updatedStorageLevel, + memoryUsed = newValue(oldBlock.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(oldBlock.diskUsed, event.blockUpdatedInfo.diskSize * diskMult), + executors = newExecutors.toSeq) + + Seq(newBlock) + } else { + Nil + } + + val address = event.blockUpdatedInfo.blockManagerId.hostPort + val (oldDist, otherDists) = rdd.info.dataDistribution.getOrElse(Nil) + .partition(_.address == address) match { + case (old, others) => + val _old = old.headOption.getOrElse { + newRDDDataDistribution( + None, + address = address, + memoryRemaining = executorInfo.info.maxMemory) + } + (_old, others) + } + + // If the new distribution is empty, just do not add it to the new RDD info. + val newDistMem = newValue(oldDist.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult) + val newDistDisk = newValue(oldDist.diskUsed, event.blockUpdatedInfo.diskSize * diskMult) + val newDists = if (newDistMem > 0 || newDistDisk > 0) { + val newDist = newRDDDataDistribution( + oldDist, + memoryUsed = newDistMem, + memoryRemaining = newValue(oldDist.memoryRemaining, + event.blockUpdatedInfo.memSize * memoryMult * -1), + diskUsed = newDistDisk) + Seq(newDist) + } else { + Nil + } + + val newRDD = newRDDStorageInfo( + rdd.info, + storageLevel = updatedStorageLevel, + memoryUsed = newValue(rdd.info.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(rdd.info.diskUsed, event.blockUpdatedInfo.diskSize * diskMult), + dataDistribution = Some(otherDists ++ newDists), + partitions = Some(others ++ newBlocks)) + new RDDStorageInfoWrapper(newRDD) + } + + // Update the ExecutorSummary for the block's manager. + val newExecSummary = newExecutorSummary( + executorInfo.info, + rddBlocks = newValue(executorInfo.info.rddBlocks, rddBlocksDelta).toInt, + memoryUsed = newValue(executorInfo.info.memoryUsed, + event.blockUpdatedInfo.memSize * memoryMult), + diskUsed = newValue(executorInfo.info.diskUsed, + event.blockUpdatedInfo.diskSize * diskMult)) + kvstore.write(new ExecutorSummaryWrapper(newExecSummary)) + } + + private def updateJobData(id: Int)(fn: JobDataWrapper => JobDataWrapper): Unit = { + update[JobDataWrapper](id) { old => + val job = old.getOrElse((newJobDataWrapper(None, newJobData(None)))) + fn(job) + } + } + + private def updateStageData( + stageId: Int, + attemptId: Int) + (fn: StageDataWrapper => StageDataWrapper): Unit = { + update[StageDataWrapper](Array(stageId, attemptId)) { old => + val stage = old.getOrElse { + newStageDataWrapper(None, newStageData(None, stageId = stageId, attemptId = attemptId)) + } + fn(stage) + } + } + + private def updateTaskData(id: Long)(fn: TaskDataWrapper => TaskDataWrapper): Unit = { + update[TaskDataWrapper](id) { old => + val task = old.getOrElse(new TaskDataWrapper(newTaskData(None, taskId = id))) + fn(task) + } + } + + private def updateExecutorSummary( + id: String) + (fn: ExecutorSummaryWrapper => ExecutorSummaryWrapper): Unit = { + update[ExecutorSummaryWrapper](id) { old => + val exec = old.getOrElse( + new ExecutorSummaryWrapper(newExecutorSummary(None, id = id))) + fn(exec) + } + } + + private def updateRDDStorageInfo( + id: Int) + (fn: RDDStorageInfoWrapper => RDDStorageInfoWrapper): Unit = { + update[RDDStorageInfoWrapper](id) { old => + val rdd = old.getOrElse(new RDDStorageInfoWrapper(newRDDStorageInfo(None, id = id))) + fn(rdd) + } + } + + private def updateExecutorStageSummary( + stageId: Int, + stageAttemptId: Int, + executorId: String) + (fn: v1.ExecutorStageSummary => v1.ExecutorStageSummary): Unit = { + update[ExecutorStageSummaryWrapper](Array(stageId, stageAttemptId, executorId)) { old => + val oldInfo = old.map(_.info).getOrElse(newExecutorStageSummary(None)) + new ExecutorStageSummaryWrapper(stageId, stageAttemptId, executorId, fn(oldInfo)) + } + } + + /** + * Return a new TaskMetrics object containing the delta of the various fields of the given + * metrics objects. This is currently targeted at updating stage data, so it does not + * necessarily calculate deltas for all the fields. + */ + private def calculateMetricsDelta( + taskMetrics: Option[TaskMetrics], + oldMetrics: Option[v1.TaskMetrics]): v1.TaskMetrics = { + + val metrics = taskMetrics.getOrElse(TaskMetrics.empty) + val old = oldMetrics.getOrElse(newTaskMetrics(TaskMetrics.empty)) + + val shuffleWriteDelta = new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten, + 0L, + metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten) + + val shuffleReadDelta = new v1.ShuffleReadMetrics( + 0L, 0L, 0L, + metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead) + + val inputDelta = new v1.InputMetrics( + metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead) + + val outputDelta = new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten) + + new v1.TaskMetrics( + 0L, 0L, + metrics.executorRunTime - old.executorRunTime, + metrics.executorCpuTime - old.executorCpuTime, + 0L, 0L, 0L, + metrics.memoryBytesSpilled - old.memoryBytesSpilled, + metrics.diskBytesSpilled - old.diskBytesSpilled, + inputDelta, + outputDelta, + shuffleReadDelta, + shuffleWriteDelta) + } + // scalastyle:off argCount + + // The following are "copy methods" for API types that are often modified by the event handlers + // above. They allow copying from an existing instance, overriding specific fields with the + // values to be updated. Since the API types are immutable, this makes the event handlers cleaner + // since they don't have to deal with default and existing values. + + private def newExecutorSummary( + old: Option[v1.ExecutorSummary], + id: Option[String] = None, + hostPort: Option[String] = None, + isActive: Option[Boolean] = None, + rddBlocks: Option[Int] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + totalCores: Option[Int] = None, + maxTasks: Option[Int] = None, + activeTasks: Option[Int] = None, + failedTasks: Option[Int] = None, + completedTasks: Option[Int] = None, + totalTasks: Option[Int] = None, + totalDuration: Option[Long] = None, + totalGCTime: Option[Long] = None, + totalInputBytes: Option[Long] = None, + totalShuffleRead: Option[Long] = None, + totalShuffleWrite: Option[Long] = None, + isBlacklisted: Option[Boolean] = None, + maxMemory: Option[Long] = None, + executorLogs: Option[Map[String, String]] = None, + memoryMetrics: Option[Option[v1.MemoryMetrics]] = None) : v1.ExecutorSummary = { + new v1.ExecutorSummary( + value(id, old.map(_.id), UNKNOWN), + value(hostPort, old.map(_.hostPort), UNKNOWN), + value(isActive, old.map(_.isActive), false), + value(rddBlocks, old.map(_.rddBlocks), 0), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + value(totalCores, old.map(_.totalCores), 0), + value(maxTasks, old.map(_.maxTasks), 0), + value(activeTasks, old.map(_.activeTasks), 0), + value(failedTasks, old.map(_.failedTasks), 0), + value(completedTasks, old.map(_.completedTasks), 0), + value(totalTasks, old.map(_.totalTasks), 0), + value(totalDuration, old.map(_.totalDuration), 0L), + value(totalGCTime, old.map(_.totalGCTime), 0L), + value(totalInputBytes, old.map(_.totalInputBytes), 0L), + value(totalShuffleRead, old.map(_.totalShuffleRead), 0L), + value(totalShuffleWrite, old.map(_.totalShuffleWrite), 0L), + value(isBlacklisted, old.map(_.isBlacklisted), false), + value(maxMemory, old.map(_.maxMemory), 0L), + value(executorLogs, old.map(_.executorLogs), Map()), + option(memoryMetrics, old.map(_.memoryMetrics))) + } + + private def newJobData( + old: Option[v1.JobData], + jobId: Option[Int] = None, + name: Option[String] = None, + description: Option[Option[String]] = None, + submissionTime: Option[Option[Date]] = None, + completionTime: Option[Option[Date]] = None, + stageIds: Option[Seq[Int]] = None, + jobGroup: Option[Option[String]] = None, + status: Option[JobExecutionStatus] = None, + numTasks: Option[Int] = None, + numActiveTasks: Option[Int] = None, + numCompletedTasks: Option[Int] = None, + numSkippedTasks: Option[Int] = None, + numFailedTasks: Option[Int] = None, + numActiveStages: Option[Int] = None, + numCompletedStages: Option[Int] = None, + numSkippedStages: Option[Int] = None, + numFailedStages: Option[Int] = None): v1.JobData = { + new v1.JobData( + value(jobId, old.map(_.jobId), -1), + value(name, old.map(_.name), null), + option(description, old.map(_.description)), + option(submissionTime, old.map(_.submissionTime)), + option(completionTime, old.map(_.completionTime)), + value(stageIds, old.map(_.stageIds), Nil), + option(jobGroup, old.map(_.jobGroup)), + value(status, old.map(_.status), null), + value(numTasks, old.map(_.numTasks), 0), + value(numActiveTasks, old.map(_.numActiveTasks), 0), + value(numCompletedTasks, old.map(_.numCompletedTasks), 0), + value(numSkippedTasks, old.map(_.numSkippedTasks), 0), + value(numFailedTasks, old.map(_.numFailedTasks), 0), + value(numActiveStages, old.map(_.numActiveStages), 0), + value(numCompletedStages, old.map(_.numCompletedStages), 0), + value(numSkippedStages, old.map(_.numSkippedStages), 0), + value(numFailedStages, old.map(_.numFailedStages), 0)) + } + + private def newStageData( + old: Option[v1.StageData], + status: Option[v1.StageStatus] = None, + stageId: Option[Int] = None, + attemptId: Option[Int] = None, + numActiveTasks: Option[Int] = None, + numCompleteTasks: Option[Int] = None, + numFailedTasks: Option[Int] = None, + executorRunTime: Option[Long] = None, + executorCpuTime: Option[Long] = None, + submissionTime: Option[Option[Date]] = None, + firstTaskLaunchedTime: Option[Option[Date]] = None, + completionTime: Option[Option[Date]] = None, + inputBytes: Option[Long] = None, + inputRecords: Option[Long] = None, + outputBytes: Option[Long] = None, + outputRecords: Option[Long] = None, + shuffleReadBytes: Option[Long] = None, + shuffleReadRecords: Option[Long] = None, + shuffleWriteBytes: Option[Long] = None, + shuffleWriteRecords: Option[Long] = None, + memoryBytesSpilled: Option[Long] = None, + diskBytesSpilled: Option[Long] = None, + name: Option[String] = None, + details: Option[String] = None, + schedulingPool: Option[String] = None, + accumulatorUpdates: Option[Seq[v1.AccumulableInfo]] = None) + : v1.StageData = { + new v1.StageData( + value(status, old.map(_.status), v1.StageStatus.PENDING), + value(stageId, old.map(_.stageId), -1), + value(attemptId, old.map(_.attemptId), -1), + value(numActiveTasks, old.map(_.numActiveTasks), 0), + value(numCompleteTasks, old.map(_.numCompleteTasks), 0), + value(numFailedTasks, old.map(_.numFailedTasks), 0), + value(executorRunTime, old.map(_.executorRunTime), 0L), + value(executorCpuTime, old.map(_.executorCpuTime), 0L), + option(submissionTime, old.map(_.submissionTime)), + option(firstTaskLaunchedTime, old.map(_.firstTaskLaunchedTime)), + option(completionTime, old.map(_.completionTime)), + value(inputBytes, old.map(_.inputBytes), 0L), + value(inputRecords, old.map(_.inputRecords), 0L), + value(outputBytes, old.map(_.outputBytes), 0L), + value(outputRecords, old.map(_.outputRecords), 0L), + value(shuffleReadBytes, old.map(_.shuffleReadBytes), 0L), + value(shuffleReadRecords, old.map(_.shuffleReadRecords), 0L), + value(shuffleWriteBytes, old.map(_.shuffleWriteBytes), 0L), + value(shuffleWriteRecords, old.map(_.shuffleWriteRecords), 0L), + value(memoryBytesSpilled, old.map(_.memoryBytesSpilled), 0L), + value(diskBytesSpilled, old.map(_.diskBytesSpilled), 0L), + value(name, old.map(_.name), null), + value(details, old.map(_.details), null), + value(schedulingPool, old.map(_.schedulingPool), null), + value(accumulatorUpdates, old.map(_.accumulatorUpdates), Nil), + None, // Task list is always empty; it's stored separately. + None) // Executor summary us always empty; it's stored separately. + } + + private def newExecutorStageSummary( + old: Option[v1.ExecutorStageSummary], + taskTime: Option[Long] = None, + failedTasks: Option[Int] = None, + succeededTasks: Option[Int] = None, + inputBytes: Option[Long] = None, + outputBytes: Option[Long] = None, + shuffleRead: Option[Long] = None, + shuffleWrite: Option[Long] = None, + memoryBytesSpilled: Option[Long] = None, + diskBytesSpilled: Option[Long] = None): v1.ExecutorStageSummary = { + new v1.ExecutorStageSummary( + value(taskTime, old.map(_.taskTime), 0L), + value(failedTasks, old.map(_.failedTasks), 0), + value(succeededTasks, old.map(_.succeededTasks), 0), + value(inputBytes, old.map(_.inputBytes), 0L), + value(outputBytes, old.map(_.outputBytes), 0L), + value(shuffleRead, old.map(_.shuffleRead), 0L), + value(shuffleWrite, old.map(_.shuffleWrite), 0L), + value(memoryBytesSpilled, old.map(_.memoryBytesSpilled), 0L), + value(diskBytesSpilled, old.map(_.diskBytesSpilled), 0L)) + } + + private def newTaskData( + old: Option[v1.TaskData], + taskId: Option[Long] = None, + index: Option[Int] = None, + attempt: Option[Int] = None, + launchTime: Option[Date] = None, + duration: Option[Option[Long]] = None, + executorId: Option[String] = None, + host: Option[String] = None, + status: Option[String] = None, + taskLocality: Option[String] = None, + speculative: Option[Boolean] = None, + accumulatorUpdates: Option[Seq[v1.AccumulableInfo]] = None, + errorMessage: Option[Option[String]] = None, + taskMetrics: Option[Option[v1.TaskMetrics]] = None): v1.TaskData = { + new v1.TaskData( + value(taskId, old.map(_.taskId), -1L), + value(index, old.map(_.index), -1), + value(attempt, old.map(_.attempt), -1), + value(launchTime, old.map(_.launchTime), DEFAULT_DATE), + option(duration, old.map(_.duration)), + value(executorId, old.map(_.executorId), UNKNOWN), + value(host, old.map(_.host), UNKNOWN), + value(status, old.map(_.status), UNKNOWN), + value(taskLocality, old.map(_.taskLocality), UNKNOWN), + value(speculative, old.map(_.speculative), false), + value(accumulatorUpdates, old.map(_.accumulatorUpdates), Nil), + option(errorMessage, old.map(_.errorMessage)), + option(taskMetrics, old.map(_.taskMetrics))) + } + + private def newRDDStorageInfo( + old: Option[v1.RDDStorageInfo], + id: Option[Int] = None, + name: Option[String] = None, + numPartitions: Option[Int] = None, + numCachedPartitions: Option[Int] = None, + storageLevel: Option[String] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + dataDistribution: Option[Option[Seq[v1.RDDDataDistribution]]] = None, + partitions: Option[Option[Seq[v1.RDDPartitionInfo]]] = None): v1.RDDStorageInfo = { + new v1.RDDStorageInfo( + value(id, old.map(_.id), -1), + value(name, old.map(_.name), UNKNOWN), + value(numPartitions, old.map(_.numPartitions), 0), + value(numCachedPartitions, old.map(_.numCachedPartitions), 0), + value(storageLevel, old.map(_.storageLevel), StorageLevel.NONE.toString()), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + option(dataDistribution, old.map(_.dataDistribution)), + option(partitions, old.map(_.partitions))) + } + + private def newRDDDataDistribution( + old: Option[v1.RDDDataDistribution], + address: Option[String] = None, + memoryUsed: Option[Long] = None, + memoryRemaining: Option[Long] = None, + diskUsed: Option[Long] = None, + onHeapMemoryUsed: Option[Option[Long]] = None, + offHeapMemoryUsed: Option[Option[Long]] = None, + onHeapMemoryRemaining: Option[Option[Long]] = None, + offHeapMemoryRemaining: Option[Option[Long]] = None): v1.RDDDataDistribution = { + new v1.RDDDataDistribution( + value(address, old.map(_.address), UNKNOWN), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(memoryRemaining, old.map(_.memoryRemaining), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + option(onHeapMemoryUsed, old.map(_.onHeapMemoryUsed)), + option(offHeapMemoryUsed, old.map(_.offHeapMemoryUsed)), + option(onHeapMemoryRemaining, old.map(_.onHeapMemoryRemaining)), + option(offHeapMemoryRemaining, old.map(_.offHeapMemoryRemaining))) + } + + private def newRDDPartitionInfo( + old: Option[v1.RDDPartitionInfo], + blockName: Option[String] = None, + storageLevel: Option[String] = None, + memoryUsed: Option[Long] = None, + diskUsed: Option[Long] = None, + executors: Option[Seq[String]] = None): v1.RDDPartitionInfo = { + new v1.RDDPartitionInfo( + value(blockName, old.map(_.blockName), UNKNOWN), + value(storageLevel, old.map(_.storageLevel), StorageLevel.NONE.toString()), + value(memoryUsed, old.map(_.memoryUsed), 0L), + value(diskUsed, old.map(_.diskUsed), 0L), + value(executors, old.map(_.executors), Nil)) + } + + // scalastyle:on argCount + + private def newAccumulableInfo(acc: AccumulableInfo): v1.AccumulableInfo = { + new v1.AccumulableInfo( + acc.id, + acc.name.orNull, + acc.update.map(_.toString()), + acc.value.map(_.toString()).orNull) + } + + private def newTaskMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + new v1.TaskMetrics( + metrics.executorDeserializeTime, + metrics.executorDeserializeCpuTime, + metrics.executorRunTime, + metrics.executorCpuTime, + metrics.resultSize, + metrics.jvmGCTime, + metrics.resultSerializationTime, + metrics.memoryBytesSpilled, + metrics.diskBytesSpilled, + new v1.InputMetrics( + metrics.inputMetrics.bytesRead, + metrics.inputMetrics.recordsRead), + new v1.OutputMetrics( + metrics.outputMetrics.bytesWritten, + metrics.outputMetrics.recordsWritten), + new v1.ShuffleReadMetrics( + metrics.shuffleReadMetrics.remoteBlocksFetched, + metrics.shuffleReadMetrics.localBlocksFetched, + metrics.shuffleReadMetrics.fetchWaitTime, + metrics.shuffleReadMetrics.remoteBytesRead, + metrics.shuffleReadMetrics.localBytesRead, + metrics.shuffleReadMetrics.recordsRead), + new v1.ShuffleWriteMetrics( + metrics.shuffleWriteMetrics.bytesWritten, + metrics.shuffleWriteMetrics.writeTime, + metrics.shuffleWriteMetrics.recordsWritten)) + } + + private def newJobDataWrapper( + old: Option[JobDataWrapper], + info: v1.JobData, + initialStages: Option[Set[(Int, Int)]] = None, + completedStages: Option[Set[Int]] = None): JobDataWrapper = { + new JobDataWrapper(info, + value(initialStages, old.map(_.initialStages), Set()), + value(completedStages, old.map(_.completedStages), Set())) + } + + private def newStageDataWrapper( + old: Option[StageDataWrapper], + info: v1.StageData, + jobIds: Option[Set[Int]] = None): StageDataWrapper = { + new StageDataWrapper(info, jobIds.orElse(old.map(_.jobIds)).getOrElse(Set())) + } + + private def activeJobs(stage: StageDataWrapper): Set[Int] = { + stage.jobIds.filter(activeJobs.contains(_)) + } + +} + +private[spark] object AppStateListener { + + val CURRENT_VERSION = 1L + val DEFAULT_DATE = new Date(-1) + val UNKNOWN = "" + +} diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala new file mode 100644 index 0000000000000..b1ac19adfc003 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File + +import scala.annotation.meta.getter +import scala.language.implicitConversions +import scala.reflect.{classTag, ClassTag} + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.kvstore._ +import org.apache.spark.status.api.v1.JacksonMessageWriter + +/** A mix-in trait with helper methods to read and update data from a KVStore. */ +private[spark] trait KVUtils { + + protected val kvstore: KVStore + + /** Helper method for choosing among a new, old or default value. */ + protected def value[T](newv: Option[T], oldv: Option[T], dflt: T): T = { + newv.orElse(oldv).getOrElse(dflt) + } + + /** Helper method for choosing between an optional new or old value. */ + protected def option[T](newv: Option[Option[T]], oldv: Option[Option[T]]): Option[T] = { + newv.getOrElse(oldv.getOrElse(None)) + } + + /** Helper method for reading a value from a KVStore, and return an Option. */ + protected def read[T: ClassTag](key: Any): Option[T] = { + try { + Some(kvstore.read(classTag[T].runtimeClass, key).asInstanceOf[T]) + } catch { + case _: NoSuchElementException => None + } + } + + /** Helper method for updating a value read from a KVStore. */ + protected def update[T: ClassTag](key: Any)(fn: Option[T] => T): Unit = { + val updated = fn(read(key)) + kvstore.write(updated) + } + + /** Utility conversion method used to copy immutable classes. */ + protected implicit def anyToOption[T](o: T): Option[T] = Option(o) + +} + +private[spark] object KVUtils extends Logging { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + /** + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. + */ + class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat) + + } + + /** + * Open or create a LevelDB store. + * + * @param path Location of the store. + * @param metadata Metadata value to compare to the data in the store. If the store does not + * contain any metadata (e.g. it's a new store), this value is written as + * the store's metadata. + */ + def open[M: ClassTag](path: File, metadata: M): LevelDB = { + require(metadata != null, "Metadata is required.") + + val db = new LevelDB(path, new KVStoreScalaSerializer()) + val dbMeta = db.getMetadata(classTag[M].runtimeClass) + if (dbMeta == null) { + db.setMetadata(metadata) + } else if (dbMeta != metadata) { + db.close() + throw new MetadataMismatchException() + } + + db + } + + class MetadataMismatchException extends Exception + +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 1818935392eb3..b81cef207d20e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -69,7 +69,7 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) }.toMap) } else { None } @@ -86,7 +86,7 @@ private[v1] object AllStagesResource { memoryBytesSpilled = summary.memoryBytesSpilled, diskBytesSpilled = summary.diskBytesSpilled ) - }) + }.toMap) } else { None } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index d66117410f2c5..25077d2669d9f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -16,9 +16,10 @@ */ package org.apache.spark.status.api.v1 +import java.lang.{Long => JLong} import java.util.Date -import scala.collection.Map +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.JobExecutionStatus @@ -131,9 +132,13 @@ class RDDDataDistribution private[spark]( val memoryUsed: Long, val memoryRemaining: Long, val diskUsed: Long, + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryUsed: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val onHeapMemoryRemaining: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val offHeapMemoryRemaining: Option[Long]) class RDDPartitionInfo private[spark]( @@ -181,7 +186,8 @@ class TaskData private[spark]( val index: Int, val attempt: Int, val launchTime: Date, - val duration: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[JLong]) + val duration: Option[Long], val executorId: String, val host: String, val status: String, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala new file mode 100644 index 0000000000000..d4b980f5dbb07 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.kvstore.KVIndex +import org.apache.spark.status.api.v1._ +import org.apache.spark.status.KVUtils._ + +private[spark] class AppStatusStoreMetadata( + val version: Long) + +private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) { + + @JsonIgnore @KVIndex + def id: String = info.id + +} + +private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { + + @JsonIgnore @KVIndex + def id: String = info.id + + @JsonIgnore @KVIndex("host") + def host: String = info.hostPort.split(":")(0) + +} + +/** + * Keep track of the existing stages when the job was submitted, and those that were + * completed during the job's execution. This allows a more accurate acounting of how + * many tasks were skipped for the job. + */ +private[spark] class JobDataWrapper( + val info: JobData, + val initialStages: Set[(Int, Int)], + val completedStages: Set[Int]) { + + @JsonIgnore @KVIndex + def id: Int = info.jobId + +} + +private[spark] class StageDataWrapper( + val info: StageData, + val jobIds: Set[Int]) { + + @JsonIgnore @KVIndex + def id: Array[Int] = Array(info.stageId, info.attemptId) + +} + +private[spark] class TaskDataWrapper(val info: TaskData) { + + @JsonIgnore @KVIndex + def id: Long = info.taskId + +} + +private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { + + @JsonIgnore @KVIndex + def id: Int = info.id + + @JsonIgnore @KVIndex("cached") + def cached: Boolean = info.numCachedPartitions > 0 + +} + +private[spark] class ExecutorStageSummaryWrapper( + val stageId: Int, + val stageAttemptId: Int, + val executorId: String, + val info: ExecutorStageSummary) { + + @JsonIgnore @KVIndex + def id: Array[Any] = Array(stageId, stageAttemptId, executorId) + + @JsonIgnore @KVIndex("stage") + def stage: Array[Int] = Array(stageId, stageAttemptId) + +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala new file mode 100644 index 0000000000000..8da3451f62b01 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala @@ -0,0 +1,694 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.io.File +import java.util.{Arrays, Date, Properties} + +import scala.collection.JavaConverters._ +import scala.reflect.{classTag, ClassTag} + +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.kvstore._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.api.v1 +import org.apache.spark.storage._ +import org.apache.spark.util.Utils + +class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter { + + private var time: Long = _ + private var testDir: File = _ + private var store: KVStore = _ + + before { + time = 0L + testDir = Utils.createTempDir() + store = KVUtils.open(testDir, getClass().getName()) + } + + after { + store.close() + Utils.deleteRecursively(testDir) + } + + test("scheduler events") { + val listener = new AppStateListener(store) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(time)) + assert(attempt.lastUpdated === new Date(time)) + assert(attempt.endTime === AppStateListener.DEFAULT_DATE) + assert(attempt.sparkUser === "user") + assert(!attempt.completed) + } + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + + execIds.foreach { id => + listener.onExecutorAdded(SparkListenerExecutorAdded(time, id, + new ExecutorInfo(s"$id.example.com", 1, Map()))) + } + + execIds.foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(exec.info.hostPort === s"$id.example.com") + assert(exec.info.isActive) + } + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + val stages = Seq( + new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), + new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2")) + + val stageProps = new Properties() + stageProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + stageProps.setProperty("spark.scheduler.pool", "schedPool") + + listener.onJobStart(SparkListenerJobStart(1, time, stages, null)) + + check[JobDataWrapper](1) { job => + assert(job.info.jobId === 1) + assert(job.info.name === stages.last.name) + assert(job.info.description === Some(stages.last.details)) + assert(job.info.status === JobExecutionStatus.RUNNING) + assert(job.info.submissionTime === Some(new Date(time))) + } + + stages.foreach { info => + check[StageDataWrapper](key(info)) { stage => + assert(stage.info.status === v1.StageStatus.PENDING) + assert(stage.jobIds === Set(1)) + } + } + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get))) + } + + // Start tasks from stage 1 + time += 1 + var _taskIdTracker = -1L + def nextTaskId(): Long = { + _taskIdTracker += 1 + _taskIdTracker + } + + def createTasks(count: Int, time: Long): Seq[TaskInfo] = { + (1 to count).map { id => + val exec = execIds(id.toInt % execIds.length) + val taskId = nextTaskId() + new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, id % 2 == 0) + } + } + + val s1Tasks = createTasks(4, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task)) + } + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numActiveTasks === s1Tasks.size) + assert(stage.info.firstTaskLaunchedTime === Some(new Date(s1Tasks.head.launchTime))) + } + + s1Tasks.foreach { task => + check[TaskDataWrapper](task.taskId) { wrapper => + assert(wrapper.info.taskId === task.taskId) + assert(wrapper.info.index === task.index) + assert(wrapper.info.attempt === task.attemptNumber) + assert(wrapper.info.launchTime === new Date(task.launchTime)) + assert(wrapper.info.executorId === task.executorId) + assert(wrapper.info.host === task.host) + assert(wrapper.info.status === task.status) + assert(wrapper.info.taskLocality === task.taskLocality.toString()) + assert(wrapper.info.speculative === task.speculative) + } + } + + // Send executor metrics update. Only update one metric to avoid a lot of boilerplate code. + s1Tasks.foreach { task => + val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED), + Some(1L), None, true, false, None) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate( + task.executorId, + Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum))))) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.memoryBytesSpilled === s1Tasks.size) + + val it = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stage.id) + .closeableIterator() + try { + val execs = it.asScala.takeWhile { v => Arrays.equals(v.stage, stage.id) }.toList + assert(execs.size > 0) + execs.foreach { exec => + assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) + } + } finally { + it.close() + } + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.finishTime = time + s1Tasks.head.failed = true + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", TaskResultLost, s1Tasks.head, null)) + + time += 1 + val reattempt = { + val orig = s1Tasks.head + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, + reattempt)) + + assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1) + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === s1Tasks.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === s1Tasks.size) + } + + check[TaskDataWrapper](s1Tasks.head.taskId) { task => + assert(task.info.status === s1Tasks.head.status) + assert(task.info.duration === Some(s1Tasks.head.duration)) + assert(task.info.errorMessage == Some(TaskResultLost.toErrorString)) + } + + check[TaskDataWrapper](reattempt.taskId) { task => + assert(task.info.index === s1Tasks.head.index) + assert(task.info.attempt === reattempt.attemptNumber) + } + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + + time += 1 + pending.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId, + "taskType", Success, task, s1Metrics)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1) + assert(job.info.numActiveTasks === 0) + assert(job.info.numCompletedTasks === pending.size) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + pending.foreach { task => + check[TaskDataWrapper](task.taskId) { task => + assert(task.info.errorMessage === None) + assert(task.info.taskMetrics.get.executorCpuTime === 2L) + } + } + + assert(store.count(classOf[TaskDataWrapper]) === pending.size + 1) + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numCompletedStages === 1) + } + + check[StageDataWrapper](key(stages.head)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numFailedTasks === 1) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === pending.size) + } + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, stageProps)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.ACTIVE) + assert(stage.info.submissionTime === Some(new Date(stages.last.submissionTime.get))) + } + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task)) + } + + time += 1 + s2Tasks.foreach { task => + task.finishTime = time + task.failed = true + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId, + "taskType", TaskResultLost, task, null)) + } + + check[JobDataWrapper](1) { job => + assert(job.info.numFailedTasks === 1 + s2Tasks.size) + assert(job.info.numActiveTasks === 0) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 1) + assert(job.info.numFailedStages === 1) + } + + check[StageDataWrapper](key(stages.last)) { stage => + assert(stage.info.status === v1.StageStatus.FAILED) + assert(stage.info.numFailedTasks === s2Tasks.size) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === 0) + } + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 3) + + val newS2Tasks = createTasks(4, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success, + task, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + check[JobDataWrapper](1) { job => + assert(job.info.numActiveStages === 0) + assert(job.info.numFailedStages === 1) + assert(job.info.numCompletedStages === 2) + } + + check[StageDataWrapper](key(newS2)) { stage => + assert(stage.info.status === v1.StageStatus.COMPLETE) + assert(stage.info.numActiveTasks === 0) + assert(stage.info.numCompleteTasks === newS2Tasks.size) + } + + // End job. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + } + + // Submit a second job that re-uses stage 1 and stage 2. Stage 1 won't be re-run, but + // stage 2 will. This should cause stage 1 to be "skipped" in the new job, and the + // stage 2 re-execution should not change the stats of the already finished job. + time += 1 + listener.onJobStart(SparkListenerJobStart(2, time, Seq(stages.head, newS2), null)) + assert(store.count(classOf[JobDataWrapper]) === 2) + + // The new stage attempt is submitted by the DAGScheduler in a real app, so it's not yet + // known by the listener. Make sure that's the case. + val j2s2 = new StageInfo(newS2.stageId, newS2.attemptId + 1, newS2.name, newS2.numTasks, + newS2.rddInfos, newS2.parentIds, newS2.details, newS2.taskMetrics) + intercept[NoSuchElementException] { + check[StageDataWrapper](key(j2s2)) { _ => () } + } + + time += 1 + j2s2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(j2s2, stageProps)) + assert(store.count(classOf[StageDataWrapper]) === 4) + + time += 1 + val j2s2Tasks = createTasks(4, time) + + j2s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(j2s2.stageId, j2s2.attemptId, task)) + } + + time += 1 + j2s2Tasks.foreach { task => + task.finishTime = time + listener.onTaskEnd(SparkListenerTaskEnd(j2s2.stageId, j2s2.attemptId, "taskType", Success, + task, null)) + } + + time += 1 + j2s2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(j2s2)) + + time += 1 + listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) + + check[JobDataWrapper](1) { job => + assert(job.info.numCompletedStages === 2) + assert(job.info.numCompletedTasks === s1Tasks.size + s2Tasks.size) + } + + check[JobDataWrapper](2) { job => + assert(job.info.status === JobExecutionStatus.SUCCEEDED) + assert(job.info.numCompletedStages === 1) + assert(job.info.numCompletedTasks === j2s2Tasks.size) + assert(job.info.numSkippedStages === 1) + assert(job.info.numSkippedTasks === s1Tasks.size) + } + + // Blacklist an executor. + time += 1 + listener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "1", 42)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "1")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Blacklist a node. + time += 1 + listener.onNodeBlacklisted(SparkListenerNodeBlacklisted(time, "1.example.com", 2)) + check[ExecutorSummaryWrapper]("1") { exec => + assert(exec.info.isBlacklisted) + } + + time += 1 + listener.onNodeUnblacklisted(SparkListenerNodeUnblacklisted(time, "1.example.com")) + check[ExecutorSummaryWrapper]("1") { exec => + assert(!exec.info.isBlacklisted) + } + + // Stop executors. + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test")) + + Seq("1", "2").foreach { id => + check[ExecutorSummaryWrapper](id) { exec => + assert(exec.info.id === id) + assert(!exec.info.isActive) + } + } + + // End the application. + listener.onApplicationEnd(SparkListenerApplicationEnd(42L)) + + check[ApplicationInfoWrapper]("id") { app => + assert(app.info.name === "name") + assert(app.info.id === "id") + assert(app.info.attempts.size === 1) + + val attempt = app.info.attempts.head + assert(attempt.attemptId === Some("attempt")) + assert(attempt.startTime === new Date(1L)) + assert(attempt.lastUpdated === new Date(42L)) + assert(attempt.endTime === new Date(42L)) + assert(attempt.duration === 41L) + assert(attempt.sparkUser === "user") + assert(attempt.completed) + } + } + + test("storage events") { + val listener = new AppStateListener(store) + val maxMemory = 42L + + // Register a couple of block managers. + val bm1 = BlockManagerId("1", "1.example.com", 42) + val bm2 = BlockManagerId("2", "2.example.com", 84) + Seq(bm1, bm2).foreach { bm => + listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId, + new ExecutorInfo(bm.host, 1, Map()))) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory)) + check[ExecutorSummaryWrapper](bm.executorId) { exec => + assert(exec.info.maxMemory === maxMemory) + } + } + + val rdd1b1 = RDDBlockId(1, 1) + val level = StorageLevel.MEMORY_AND_DISK + + // Submit a stage and make sure the RDD is recorded. + val rddInfo = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) + val stage = new StageInfo(1, 0, "stage1", 4, Seq(rddInfo), Nil, "details1") + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.name === rddInfo.name) + assert(wrapper.info.numPartitions === rddInfo.numPartitions) + assert(wrapper.info.storageLevel === rddInfo.storageLevel.description) + } + + // Add partition 1 replicated on two block managers. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 1L) + assert(wrapper.info.diskUsed === 1L) + + assert(wrapper.info.dataDistribution.isDefined) + assert(wrapper.info.dataDistribution.get.size === 1) + + val dist = wrapper.info.dataDistribution.get.head + assert(dist.address === bm1.hostPort) + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + assert(wrapper.info.partitions.isDefined) + assert(wrapper.info.partitions.get.size === 1) + + val part = wrapper.info.partitions.get.head + assert(part.blockName === rdd1b1.name) + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, level, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 2L) + assert(wrapper.info.diskUsed === 2L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 1L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get + assert(dist.memoryUsed === 1L) + assert(dist.diskUsed === 1L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get(0) + assert(part.memoryUsed === 2L) + assert(part.diskUsed === 2L) + assert(part.executors === Seq(bm1.executorId, bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 1L) + assert(exec.info.diskUsed === 1L) + } + + // Add a second partition only to bm 1. + val rdd1b2 = RDDBlockId(1, 2) + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b2, level, + 3L, 3L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 5L) + assert(wrapper.info.diskUsed === 5L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 4L) + assert(dist.diskUsed === 4L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b2.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 3L) + assert(part.diskUsed === 3L) + assert(part.executors === Seq(bm1.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 2L) + assert(exec.info.memoryUsed === 4L) + assert(exec.info.diskUsed === 4L) + } + + // Remove block 1 from bm 1. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 4L) + assert(wrapper.info.diskUsed === 4L) + assert(wrapper.info.dataDistribution.get.size === 2L) + assert(wrapper.info.partitions.get.size === 2L) + + val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get + assert(dist.memoryUsed === 3L) + assert(dist.diskUsed === 3L) + assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) + + val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.name).get + assert(part.storageLevel === level.description) + assert(part.memoryUsed === 1L) + assert(part.diskUsed === 1L) + assert(part.executors === Seq(bm2.executorId)) + } + + check[ExecutorSummaryWrapper](bm1.executorId) { exec => + assert(exec.info.rddBlocks === 1L) + assert(exec.info.memoryUsed === 3L) + assert(exec.info.diskUsed === 3L) + } + + // Remove block 2 from bm 2. This should leave only block 2 info in the store. + listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, + StorageLevel.NONE, 1L, 1L))) + + check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper => + assert(wrapper.info.memoryUsed === 3L) + assert(wrapper.info.diskUsed === 3L) + assert(wrapper.info.dataDistribution.get.size === 1L) + assert(wrapper.info.partitions.get.size === 1L) + assert(wrapper.info.partitions.get(0).blockName === rdd1b2.name) + } + + check[ExecutorSummaryWrapper](bm2.executorId) { exec => + assert(exec.info.rddBlocks === 0L) + assert(exec.info.memoryUsed === 0L) + assert(exec.info.diskUsed === 0L) + } + + // Unpersist RDD1. + listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId)) + intercept[NoSuchElementException] { + check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () } + } + + } + + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) + + private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { + val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T] + fn(value) + } + +} diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 656c7614c2bfd..d7932727b4bd5 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -93,7 +93,7 @@ This file is divided into 3 sections: - +