diff --git a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala
index f897982d8cfbb..5d875757e3736 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStateListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStateListener.scala
@@ -609,7 +609,8 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
- case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too.
+ case stream: StreamBlockId => updateStreamBlock(event, stream)
+ case _ =>
}
}
@@ -694,7 +695,11 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
newRDDDataDistribution(
None,
address = address,
- memoryRemaining = executorInfo.info.maxMemory)
+ memoryRemaining = executorInfo.info.maxMemory,
+ onHeapMemoryUsed = memMetrics.map { _ => 0L },
+ offHeapMemoryUsed = memMetrics.map { _ => 0L },
+ onHeapMemoryRemaining = memMetrics.map(_.totalOnHeapStorageMemory),
+ offHeapMemoryRemaining = memMetrics.map(_.totalOffHeapStorageMemory))
}
(_old, others)
}
@@ -703,19 +708,18 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
val newDistMem = newValue(oldDist.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult)
val newDistDisk = newValue(oldDist.diskUsed, event.blockUpdatedInfo.diskSize * diskMult)
val newDists = if (newDistMem > 0 || newDistDisk > 0) {
- val newOffHeap = if (storageLevel.useOffHeap) Some(newDistMem) else None
- val newOnHeap = if (!storageLevel.useOffHeap) Some(newDistMem) else None
- val remainingOffHeap = if (storageLevel.useOffHeap) {
- remainingMemory(memMetrics.map(_.totalOffHeapStorageMemory), oldDist.offHeapMemoryUsed,
- newOffHeap)
+ val _newMem = Some(newDistMem)
+ val (newOffHeap, remainingOffHeap) = if (storageLevel.useOffHeap) {
+ (_newMem, remainingMemory(oldDist.offHeapMemoryRemaining, oldDist.offHeapMemoryUsed,
+ _newMem))
} else {
- None
+ (oldDist.offHeapMemoryUsed, oldDist.offHeapMemoryRemaining)
}
- val remainingOnHeap = if (!storageLevel.useOffHeap) {
- remainingMemory(memMetrics.map(_.totalOnHeapStorageMemory), oldDist.onHeapMemoryUsed,
- newOnHeap)
+ val (newOnHeap, remainingOnHeap) = if (!storageLevel.useOffHeap) {
+ (_newMem, remainingMemory(oldDist.onHeapMemoryRemaining, oldDist.onHeapMemoryUsed,
+ _newMem))
} else {
- None
+ (oldDist.onHeapMemoryUsed, oldDist.onHeapMemoryRemaining)
}
val newDist = newRDDDataDistribution(
@@ -733,13 +737,15 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
Nil
}
+ val allBlocks = others ++ newBlocks
val newRDD = newRDDStorageInfo(
rdd.info,
+ numCachedPartitions = allBlocks.size,
storageLevel = updatedStorageLevel,
memoryUsed = newValue(rdd.info.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult),
diskUsed = newValue(rdd.info.diskUsed, event.blockUpdatedInfo.diskSize * diskMult),
dataDistribution = Some(otherDists ++ newDists),
- partitions = Some(others ++ newBlocks))
+ partitions = Some(allBlocks))
new RDDStorageInfoWrapper(newRDD)
}
@@ -771,6 +777,26 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
kvstore.write(new ExecutorSummaryWrapper(newExecSummary))
}
+ private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
+ val storageLevel = event.blockUpdatedInfo.storageLevel
+ if (storageLevel.isValid) {
+ val data = new StreamBlockData(
+ stream.name,
+ event.blockUpdatedInfo.blockManagerId.executorId,
+ event.blockUpdatedInfo.blockManagerId.hostPort,
+ storageLevel.description,
+ storageLevel.useMemory,
+ storageLevel.useDisk,
+ storageLevel.deserialized,
+ event.blockUpdatedInfo.memSize,
+ event.blockUpdatedInfo.diskSize)
+ kvstore.write(data)
+ } else {
+ kvstore.delete(classOf[StreamBlockData],
+ Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId))
+ }
+ }
+
private def updateJobData(id: Int)(fn: JobDataWrapper => JobDataWrapper): Unit = {
update[JobDataWrapper](id) { old =>
val job = old.getOrElse((newJobDataWrapper(None, newJobData(None))))
diff --git a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
index 5794ae1abb044..c843cfda3ad9f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStateStore.scala
@@ -199,8 +199,10 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
indexed.skip(offset).max(length).asScala.map(_.info).toSeq
}
- def rddList(): Seq[v1.RDDStorageInfo] = {
- store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq
+ def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
+ store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).filter { rdd =>
+ !cachedOnly || rdd.numCachedPartitions > 0
+ }.toSeq
}
def rdd(rddId: Int): v1.RDDStorageInfo = {
@@ -211,6 +213,10 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
}
+ def streamBlocksList(): Seq[StreamBlockData] = {
+ store.view(classOf[StreamBlockData]).asScala.toSeq
+ }
+
def close(): Unit = {
store.close()
tempStorePath.foreach(Utils.deleteRecursively)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 1279b281ad8d8..2189e1da91841 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -21,90 +21,11 @@ import javax.ws.rs.core.MediaType
import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.ui.storage.StorageListener
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllRDDResource(ui: SparkUI) {
@GET
- def rddList(): Seq[RDDStorageInfo] = {
- val storageStatusList = ui.storageListener.activeStorageStatusList
- val rddInfos = ui.storageListener.rddInfoList
- rddInfos.map{rddInfo =>
- AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
- includeDetails = false)
- }
- }
+ def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()
}
-
-private[spark] object AllRDDResource {
-
- def getRDDStorageInfo(
- rddId: Int,
- listener: StorageListener,
- includeDetails: Boolean): Option[RDDStorageInfo] = {
- val storageStatusList = listener.activeStorageStatusList
- listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
- getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
- }
- }
-
- def getRDDStorageInfo(
- rddId: Int,
- rddInfo: RDDInfo,
- storageStatusList: Seq[StorageStatus],
- includeDetails: Boolean): RDDStorageInfo = {
- val workers = storageStatusList.map { (rddId, _) }
- val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
- val blocks = storageStatusList
- .flatMap { _.rddBlocksById(rddId) }
- .sortWith { _._1.name < _._1.name }
- .map { case (blockId, status) =>
- (blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
- }
-
- val dataDistribution = if (includeDetails) {
- Some(storageStatusList.map { status =>
- new RDDDataDistribution(
- address = status.blockManagerId.hostPort,
- memoryUsed = status.memUsedByRdd(rddId),
- memoryRemaining = status.memRemaining,
- diskUsed = status.diskUsedByRdd(rddId),
- onHeapMemoryUsed = Some(
- if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
- offHeapMemoryUsed = Some(
- if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
- onHeapMemoryRemaining = status.onHeapMemRemaining,
- offHeapMemoryRemaining = status.offHeapMemRemaining
- ) } )
- } else {
- None
- }
- val partitions = if (includeDetails) {
- Some(blocks.map { case (id, block, locations) =>
- new RDDPartitionInfo(
- blockName = id.name,
- storageLevel = block.storageLevel.description,
- memoryUsed = block.memSize,
- diskUsed = block.diskSize,
- executors = locations
- )
- } )
- } else {
- None
- }
-
- new RDDStorageInfo(
- id = rddId,
- name = rddInfo.name,
- numPartitions = rddInfo.numPartitions,
- numCachedPartitions = rddInfo.numCachedPartitions,
- storageLevel = rddInfo.storageLevel.description,
- memoryUsed = rddInfo.memSize,
- diskUsed = rddInfo.diskSize,
- dataDistribution = dataDistribution,
- partitions = partitions
- )
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
index 237aeac185877..ca9758cf0d109 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1
+import java.util.NoSuchElementException
import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.core.MediaType
@@ -26,9 +27,12 @@ private[v1] class OneRDDResource(ui: SparkUI) {
@GET
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
- AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
- throw new NotFoundException(s"no rdd found w/ id $rddId")
- )
+ try {
+ ui.store.rdd(rddId)
+ } catch {
+ case _: NoSuchElementException =>
+ throw new NotFoundException(s"no rdd found w/ id $rddId")
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 71683a1649cdc..9db750a169498 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -142,3 +142,19 @@ private[spark] class ExecutorStageSummaryWrapper(
private[spark] class ExecutorEventData(
@KVIndexParam val id: Long,
val event: SparkListenerEvent)
+
+private[spark] class StreamBlockData(
+ val name: String,
+ val executorId: String,
+ val hostPort: String,
+ val storageLevel: String,
+ val useMemory: Boolean,
+ val useDisk: Boolean,
+ val deserialized: Boolean,
+ val memSize: Long,
+ val diskSize: Long) {
+
+ @JsonIgnore @KVIndex
+ def key: Array[String] = Array(name, executorId)
+
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
deleted file mode 100644
index 0a14fcadf53e0..0000000000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import scala.collection.mutable
-
-import org.apache.spark.scheduler._
-
-private[spark] case class BlockUIData(
- blockId: BlockId,
- location: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long)
-
-/**
- * The aggregated status of stream blocks in an executor
- */
-private[spark] case class ExecutorStreamBlockStatus(
- executorId: String,
- location: String,
- blocks: Seq[BlockUIData]) {
-
- def totalMemSize: Long = blocks.map(_.memSize).sum
-
- def totalDiskSize: Long = blocks.map(_.diskSize).sum
-
- def numStreamBlocks: Int = blocks.size
-
-}
-
-private[spark] class BlockStatusListener extends SparkListener {
-
- private val blockManagers =
- new mutable.HashMap[BlockManagerId, mutable.HashMap[BlockId, BlockUIData]]
-
- override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
- val blockId = blockUpdated.blockUpdatedInfo.blockId
- if (!blockId.isInstanceOf[StreamBlockId]) {
- // Now we only monitor StreamBlocks
- return
- }
- val blockManagerId = blockUpdated.blockUpdatedInfo.blockManagerId
- val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
- val memSize = blockUpdated.blockUpdatedInfo.memSize
- val diskSize = blockUpdated.blockUpdatedInfo.diskSize
-
- synchronized {
- // Drop the update info if the block manager is not registered
- blockManagers.get(blockManagerId).foreach { blocksInBlockManager =>
- if (storageLevel.isValid) {
- blocksInBlockManager.put(blockId,
- BlockUIData(
- blockId,
- blockManagerId.hostPort,
- storageLevel,
- memSize,
- diskSize)
- )
- } else {
- // If isValid is not true, it means we should drop the block.
- blocksInBlockManager -= blockId
- }
- }
- }
- }
-
- override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
- synchronized {
- blockManagers.put(blockManagerAdded.blockManagerId, mutable.HashMap())
- }
- }
-
- override def onBlockManagerRemoved(
- blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = synchronized {
- blockManagers -= blockManagerRemoved.blockManagerId
- }
-
- def allExecutorStreamBlockStatus: Seq[ExecutorStreamBlockStatus] = synchronized {
- blockManagers.map { case (blockManagerId, blocks) =>
- ExecutorStreamBlockStatus(
- blockManagerId.executorId, blockManagerId.hostPort, blocks.values.toSeq)
- }.toSeq
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
deleted file mode 100644
index ac60f795915a3..0000000000000
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import scala.collection.mutable
-
-import org.apache.spark.SparkConf
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.scheduler._
-
-/**
- * :: DeveloperApi ::
- * A SparkListener that maintains executor storage status.
- *
- * This class is thread-safe (unlike JobProgressListener)
- */
-@DeveloperApi
-@deprecated("This class will be removed in a future release.", "2.2.0")
-class StorageStatusListener(conf: SparkConf) extends SparkListener {
- // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
- private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
- private[storage] val deadExecutorStorageStatus = new mutable.ListBuffer[StorageStatus]()
- private[this] val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
-
- def storageStatusList: Seq[StorageStatus] = synchronized {
- executorIdToStorageStatus.values.toSeq
- }
-
- def deadStorageStatusList: Seq[StorageStatus] = synchronized {
- deadExecutorStorageStatus
- }
-
- /** Update storage status list to reflect updated block statuses */
- private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
- executorIdToStorageStatus.get(execId).foreach { storageStatus =>
- updatedBlocks.foreach { case (blockId, updatedStatus) =>
- if (updatedStatus.storageLevel == StorageLevel.NONE) {
- storageStatus.removeBlock(blockId)
- } else {
- storageStatus.updateBlock(blockId, updatedStatus)
- }
- }
- }
- }
-
- /** Update storage status list to reflect the removal of an RDD from the cache */
- private def updateStorageStatus(unpersistedRDDId: Int) {
- storageStatusList.foreach { storageStatus =>
- storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
- storageStatus.removeBlock(blockId)
- }
- }
- }
-
- override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
- updateStorageStatus(unpersistRDD.rddId)
- }
-
- override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) {
- synchronized {
- val blockManagerId = blockManagerAdded.blockManagerId
- val executorId = blockManagerId.executorId
- // The onHeap and offHeap memory are always defined for new applications,
- // but they can be missing if we are replaying old event logs.
- val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
- blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
- executorIdToStorageStatus(executorId) = storageStatus
-
- // Try to remove the dead storage status if same executor register the block manager twice.
- deadExecutorStorageStatus.zipWithIndex.find(_._1.blockManagerId.executorId == executorId)
- .foreach(toRemoveExecutor => deadExecutorStorageStatus.remove(toRemoveExecutor._2))
- }
- }
-
- override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
- synchronized {
- val executorId = blockManagerRemoved.blockManagerId.executorId
- executorIdToStorageStatus.remove(executorId).foreach { status =>
- deadExecutorStorageStatus += status
- }
- if (deadExecutorStorageStatus.size > retainedDeadExecutors) {
- deadExecutorStorageStatus.trimStart(1)
- }
- }
- }
-
- override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
- val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
- val blockId = blockUpdated.blockUpdatedInfo.blockId
- val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
- val memSize = blockUpdated.blockUpdatedInfo.memSize
- val diskSize = blockUpdated.blockUpdatedInfo.diskSize
- val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
- updateStorageStatus(executorId, Seq((blockId, blockStatus)))
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index d9fe45b7f8e4e..ce2696192400c 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -26,13 +26,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1._
-import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
-import org.apache.spark.ui.storage.{StorageListener, StorageTab}
+import org.apache.spark.ui.storage.StorageTab
import org.apache.spark.util.Utils
/**
@@ -43,9 +42,7 @@ private[spark] class SparkUI private (
val sc: Option[SparkContext],
val conf: SparkConf,
securityManager: SecurityManager,
- val storageStatusListener: StorageStatusListener,
val jobProgressListener: JobProgressListener,
- val storageListener: StorageListener,
val operationGraphListener: RDDOperationGraphListener,
var appName: String,
val basePath: String,
@@ -67,7 +64,7 @@ private[spark] class SparkUI private (
attachTab(jobsTab)
val stagesTab = new StagesTab(this, store)
attachTab(stagesTab)
- attachTab(new StorageTab(this))
+ attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
@@ -175,16 +172,12 @@ private[spark] object SparkUI {
listener
}
- val storageStatusListener = new StorageStatusListener(conf)
- val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
- listenerBus.addListener(storageStatusListener)
- listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
- new SparkUI(store, sc, conf, securityManager, storageStatusListener, jobProgressListener,
- storageListener, operationGraphListener, appName, basePath, startTime)
+ new SparkUI(store, sc, conf, securityManager, jobProgressListener, operationGraphListener,
+ appName, basePath, startTime)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index a1a0c729b9240..7bbd5efdd68ec 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,13 +22,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, Unparsed}
-import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo}
-import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
+import org.apache.spark.status.AppStateStore
+import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo}
+import org.apache.spark.ui._
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
- private val listener = parent.listener
+private[ui] class RDDPage(parent: SparkUITab, store: AppStateStore) extends WebUIPage("rdd") {
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
@@ -47,11 +47,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val blockPrevPageSize = Option(parameterBlockPrevPageSize).map(_.toInt).getOrElse(blockPageSize)
val rddId = parameterId.toInt
- val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
- .getOrElse {
+ val rddStorageInfo = try {
+ store.rdd(rddId)
+ } catch {
+ case _: NoSuchElementException =>
// Rather than crashing, render an "RDD Not Found" page
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
- }
+ }
// Worker table
val workerTable = UIUtils.listingTable(workerHeader, workerRow,
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index aa84788f1df88..16d5a4123362c 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -19,23 +19,24 @@ package org.apache.spark.ui.storage
import javax.servlet.http.HttpServletRequest
+import scala.collection.SortedMap
import scala.xml.Node
+import org.apache.spark.status.{AppStateStore, StreamBlockData}
+import org.apache.spark.status.api.v1
import org.apache.spark.storage._
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui._
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
- private val listener = parent.listener
+private[ui] class StoragePage(parent: SparkUITab, store: AppStateStore) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
- val content = rddTable(listener.rddInfoList) ++
- receiverBlockTables(listener.allExecutorStreamBlockStatus.sortBy(_.executorId))
+ val content = rddTable(store.rddList()) ++ receiverBlockTables(store.streamBlocksList())
UIUtils.headerSparkPage("Storage", content, parent)
}
- private[storage] def rddTable(rdds: Seq[RDDInfo]): Seq[Node] = {
+ private[storage] def rddTable(rdds: Seq[v1.RDDStorageInfo]): Seq[Node] = {
if (rdds.isEmpty) {
// Don't show the rdd table if there is no RDD persisted.
Nil
@@ -57,7 +58,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Size on Disk")
/** Render an HTML row representing an RDD */
- private def rddRow(rdd: RDDInfo): Seq[Node] = {
+ private def rddRow(rdd: v1.RDDStorageInfo): Seq[Node] = {
// scalastyle:off
@@ -65,35 +66,40 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
{rdd.name}
|
- {rdd.storageLevel.description}
+ | {rdd.storageLevel}
|
{rdd.numCachedPartitions.toString} |
{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} |
- {Utils.bytesToString(rdd.memSize)} |
- {Utils.bytesToString(rdd.diskSize)} |
+ {Utils.bytesToString(rdd.memoryUsed)} |
+ {Utils.bytesToString(rdd.diskUsed)} |
// scalastyle:on
}
- private[storage] def receiverBlockTables(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
- if (statuses.map(_.numStreamBlocks).sum == 0) {
+ private[storage] def receiverBlockTables(blocks: Seq[StreamBlockData]): Seq[Node] = {
+ if (blocks.isEmpty) {
// Don't show the tables if there is no stream block
Nil
} else {
- val blocks = statuses.flatMap(_.blocks).groupBy(_.blockId).toSeq.sortBy(_._1.toString)
+ val sorted = blocks.groupBy(_.name).toSeq.sortBy(_._1.toString)
Receiver Blocks
- {executorMetricsTable(statuses)}
- {streamBlockTable(blocks)}
+ {executorMetricsTable(blocks)}
+ {streamBlockTable(sorted)}
}
}
- private def executorMetricsTable(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
+ private def executorMetricsTable(blocks: Seq[StreamBlockData]): Seq[Node] = {
+ val blockManagers = SortedMap(blocks.groupBy(_.executorId).toSeq: _*)
+ .map { case (id, blocks) =>
+ new ExecutorStreamSummary(blocks)
+ }
+
Aggregated Block Metrics by Executor
- {UIUtils.listingTable(executorMetricsTableHeader, executorMetricsTableRow, statuses,
+ {UIUtils.listingTable(executorMetricsTableHeader, executorMetricsTableRow, blockManagers,
id = Some("storage-by-executor-stream-blocks"))}
}
@@ -105,7 +111,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Total Size on Disk",
"Stream Blocks")
- private def executorMetricsTableRow(status: ExecutorStreamBlockStatus): Seq[Node] = {
+ private def executorMetricsTableRow(status: ExecutorStreamSummary): Seq[Node] = {
{status.executorId}
@@ -125,7 +131,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
|
}
- private def streamBlockTable(blocks: Seq[(BlockId, Seq[BlockUIData])]): Seq[Node] = {
+ private def streamBlockTable(blocks: Seq[(String, Seq[StreamBlockData])]): Seq[Node] = {
if (blocks.isEmpty) {
Nil
} else {
@@ -149,7 +155,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
"Size")
/** Render a stream block */
- private def streamBlockTableRow(block: (BlockId, Seq[BlockUIData])): Seq[Node] = {
+ private def streamBlockTableRow(block: (String, Seq[StreamBlockData])): Seq[Node] = {
val replications = block._2
assert(replications.nonEmpty) // This must be true because it's the result of "groupBy"
if (replications.size == 1) {
@@ -161,33 +167,36 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
}
private def streamBlockTableSubrow(
- blockId: BlockId, block: BlockUIData, replication: Int, firstSubrow: Boolean): Seq[Node] = {
+ blockId: String,
+ block: StreamBlockData,
+ replication: Int,
+ firstSubrow: Boolean): Seq[Node] = {
val (storageLevel, size) = streamBlockStorageLevelDescriptionAndSize(block)
{
if (firstSubrow) {
- {block.blockId.toString}
+ {block.name}
|
{replication.toString}
|
}
}
- {block.location} |
+ {block.hostPort} |
{storageLevel} |
{Utils.bytesToString(size)} |
}
private[storage] def streamBlockStorageLevelDescriptionAndSize(
- block: BlockUIData): (String, Long) = {
- if (block.storageLevel.useDisk) {
+ block: StreamBlockData): (String, Long) = {
+ if (block.useDisk) {
("Disk", block.diskSize)
- } else if (block.storageLevel.useMemory && block.storageLevel.deserialized) {
+ } else if (block.useMemory && block.deserialized) {
("Memory", block.memSize)
- } else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
+ } else if (block.useMemory && !block.deserialized) {
("Memory Serialized", block.memSize)
} else {
throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
@@ -195,3 +204,17 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
}
}
+
+private class ExecutorStreamSummary(blocks: Seq[StreamBlockData]) {
+
+ def executorId: String = blocks.head.executorId
+
+ def location: String = blocks.head.hostPort
+
+ def totalMemSize: Long = blocks.map(_.memSize).sum
+
+ def totalDiskSize: Long = blocks.map(_.diskSize).sum
+
+ def numStreamBlocks: Int = blocks.size
+
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 148efb134e14f..33869d9c21963 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -21,67 +21,14 @@ import scala.collection.mutable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
+import org.apache.spark.status.AppStateStore
import org.apache.spark.storage._
import org.apache.spark.ui._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") {
- val listener = parent.storageListener
+private[ui] class StorageTab(parent: SparkUI, store: AppStateStore)
+ extends SparkUITab(parent, "storage") {
- attachPage(new StoragePage(this))
- attachPage(new RDDPage(this))
-}
-
-/**
- * :: DeveloperApi ::
- * A SparkListener that prepares information to be displayed on the BlockManagerUI.
- *
- * This class is thread-safe (unlike JobProgressListener)
- */
-@DeveloperApi
-@deprecated("This class will be removed in a future release.", "2.2.0")
-class StorageListener(storageStatusListener: StorageStatusListener) extends BlockStatusListener {
-
- private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
-
- def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
-
- /** Filter RDD info to include only those with cached partitions */
- def rddInfoList: Seq[RDDInfo] = synchronized {
- _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
- }
-
- /** Update the storage info of the RDDs whose blocks are among the given updated blocks */
- private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
- val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
- val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
- StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
- val rddInfos = stageSubmitted.stageInfo.rddInfos
- rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info).name = info.name }
- }
-
- override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
- // Remove all partitions that are no longer cached in current completed stage
- val completedRddIds = stageCompleted.stageInfo.rddInfos.map(r => r.id).toSet
- _rddInfoMap.retain { case (id, info) =>
- !completedRddIds.contains(id) || info.numCachedPartitions > 0
- }
- }
-
- override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
- _rddInfoMap.remove(unpersistRDD.rddId)
- }
-
- override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
- super.onBlockUpdated(blockUpdated)
- val blockId = blockUpdated.blockUpdatedInfo.blockId
- val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
- val memSize = blockUpdated.blockUpdatedInfo.memSize
- val diskSize = blockUpdated.blockUpdatedInfo.diskSize
- val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
- updateRDDInfo(Seq((blockId, blockStatus)))
- }
+ attachPage(new StoragePage(this, store))
+ attachPage(new RDDPage(this, store))
}
diff --git a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala
index 29722e3ef154b..d47d52f2b4e34 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStateListenerSuite.scala
@@ -600,6 +600,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, level, 1L, 1L)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+ assert(wrapper.info.numCachedPartitions === 1L)
assert(wrapper.info.memoryUsed === 1L)
assert(wrapper.info.diskUsed === 1L)
@@ -632,6 +633,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, level, 1L, 1L)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+ assert(wrapper.info.numCachedPartitions === 1L)
assert(wrapper.info.memoryUsed === 2L)
assert(wrapper.info.diskUsed === 2L)
assert(wrapper.info.dataDistribution.get.size === 2L)
@@ -660,6 +662,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
3L, 3L)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+ assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === 5L)
assert(wrapper.info.diskUsed === 5L)
assert(wrapper.info.dataDistribution.get.size === 2L)
@@ -688,6 +691,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
StorageLevel.NONE, 1L, 1L)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+ assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === 4L)
assert(wrapper.info.diskUsed === 4L)
assert(wrapper.info.dataDistribution.get.size === 2L)
@@ -716,6 +720,7 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
StorageLevel.NONE, 1L, 1L)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+ assert(wrapper.info.numCachedPartitions === 1L)
assert(wrapper.info.memoryUsed === 3L)
assert(wrapper.info.diskUsed === 3L)
assert(wrapper.info.dataDistribution.get.size === 1L)
@@ -731,10 +736,33 @@ class AppStateListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Unpersist RDD1.
listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId))
- intercept[NoSuchElementException] {
+ intercept[NoSuchElementException] {
check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () }
}
+ // Update a StreamBlock.
+ val stream1 = StreamBlockId(1, 1L)
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(bm1, stream1, level, 1L, 1L)))
+
+ check[StreamBlockData](Array(stream1.name, bm1.executorId)) { stream =>
+ assert(stream.name === stream1.name)
+ assert(stream.executorId === bm1.executorId)
+ assert(stream.hostPort === bm1.hostPort)
+ assert(stream.storageLevel === level.description)
+ assert(stream.useMemory === level.useMemory)
+ assert(stream.useDisk === level.useDisk)
+ assert(stream.deserialized === level.deserialized)
+ assert(stream.memSize === 1L)
+ assert(stream.diskSize === 1L)
+ }
+
+ // Drop a StreamBlock.
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(bm1, stream1, StorageLevel.NONE, 0L, 0L)))
+ intercept[NoSuchElementException] {
+ check[StreamBlockData](stream1.name) { _ => () }
+ }
}
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
deleted file mode 100644
index 06acca3943c20..0000000000000
--- a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.scheduler._
-
-class BlockStatusListenerSuite extends SparkFunSuite {
-
- test("basic functions") {
- val blockManagerId = BlockManagerId("0", "localhost", 10000)
- val listener = new BlockStatusListener()
-
- // Add a block manager and a new block status
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId, 0))
- listener.onBlockUpdated(SparkListenerBlockUpdated(
- BlockUpdatedInfo(
- blockManagerId,
- StreamBlockId(0, 100),
- StorageLevel.MEMORY_AND_DISK,
- memSize = 100,
- diskSize = 100)))
- // The new block status should be added to the listener
- val expectedBlock = BlockUIData(
- StreamBlockId(0, 100),
- "localhost:10000",
- StorageLevel.MEMORY_AND_DISK,
- memSize = 100,
- diskSize = 100
- )
- val expectedExecutorStreamBlockStatus = Seq(
- ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
- )
- assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus)
-
- // Add the second block manager
- val blockManagerId2 = BlockManagerId("1", "localhost", 10001)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId2, 0))
- // Add a new replication of the same block id from the second manager
- listener.onBlockUpdated(SparkListenerBlockUpdated(
- BlockUpdatedInfo(
- blockManagerId2,
- StreamBlockId(0, 100),
- StorageLevel.MEMORY_AND_DISK,
- memSize = 100,
- diskSize = 100)))
- val expectedBlock2 = BlockUIData(
- StreamBlockId(0, 100),
- "localhost:10001",
- StorageLevel.MEMORY_AND_DISK,
- memSize = 100,
- diskSize = 100
- )
- // Each block manager should contain one block
- val expectedExecutorStreamBlockStatus2 = Set(
- ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
- ExecutorStreamBlockStatus("1", "localhost:10001", Seq(expectedBlock2))
- )
- assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus2)
-
- // Remove a replication of the same block
- listener.onBlockUpdated(SparkListenerBlockUpdated(
- BlockUpdatedInfo(
- blockManagerId2,
- StreamBlockId(0, 100),
- StorageLevel.NONE, // StorageLevel.NONE means removing it
- memSize = 0,
- diskSize = 0)))
- // Only the first block manager contains a block
- val expectedExecutorStreamBlockStatus3 = Set(
- ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
- ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
- )
- assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus3)
-
- // Remove the second block manager at first but add a new block status
- // from this removed block manager
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId2))
- listener.onBlockUpdated(SparkListenerBlockUpdated(
- BlockUpdatedInfo(
- blockManagerId2,
- StreamBlockId(0, 100),
- StorageLevel.MEMORY_AND_DISK,
- memSize = 100,
- diskSize = 100)))
- // The second block manager is removed so we should not see the new block
- val expectedExecutorStreamBlockStatus4 = Seq(
- ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
- )
- assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus4)
-
- // Remove the last block manager
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId))
- // No block manager now so we should dop all block managers
- assert(listener.allExecutorStreamBlockStatus.isEmpty)
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
deleted file mode 100644
index 9835f11a2f7ed..0000000000000
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler._
-
-/**
- * Test the behavior of StorageStatusListener in response to all relevant events.
- */
-class StorageStatusListenerSuite extends SparkFunSuite {
- private val bm1 = BlockManagerId("big", "dog", 1)
- private val bm2 = BlockManagerId("fat", "duck", 2)
- private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
- private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
- private val conf = new SparkConf()
-
- test("block manager added/removed") {
- conf.set("spark.ui.retainedDeadExecutors", "1")
- val listener = new StorageStatusListener(conf)
-
- // Block manager add
- assert(listener.executorIdToStorageStatus.size === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- assert(listener.executorIdToStorageStatus.size === 1)
- assert(listener.executorIdToStorageStatus.get("big").isDefined)
- assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
- assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
- assert(listener.executorIdToStorageStatus.size === 2)
- assert(listener.executorIdToStorageStatus.get("fat").isDefined)
- assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
- assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
-
- // Block manager remove
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
- assert(listener.executorIdToStorageStatus.size === 1)
- assert(!listener.executorIdToStorageStatus.get("big").isDefined)
- assert(listener.executorIdToStorageStatus.get("fat").isDefined)
- assert(listener.deadExecutorStorageStatus.size === 1)
- assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("big"))
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
- assert(listener.executorIdToStorageStatus.size === 0)
- assert(!listener.executorIdToStorageStatus.get("big").isDefined)
- assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
- assert(listener.deadExecutorStorageStatus.size === 1)
- assert(listener.deadExecutorStorageStatus(0).blockManagerId.executorId.equals("fat"))
- }
-
- test("task end without updated blocks") {
- val listener = new StorageStatusListener(conf)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
- val taskMetrics = new TaskMetrics
-
- // Task end with no updated blocks
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- }
-
- test("updated blocks") {
- val listener = new StorageStatusListener(conf)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
-
- val blockUpdateInfos1 = Seq(
- BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
- BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
- )
- val blockUpdateInfos2 =
- Seq(BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
-
- // Add some new blocks
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- postUpdateBlock(listener, blockUpdateInfos1)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- postUpdateBlock(listener, blockUpdateInfos2)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
-
- // Dropped the blocks
- val droppedBlockInfo1 = Seq(
- BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.NONE, 0L, 0L),
- BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
- )
- val droppedBlockInfo2 = Seq(
- BlockUpdatedInfo(bm2, RDDBlockId(1, 2), StorageLevel.NONE, 0L, 0L),
- BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
- )
-
- postUpdateBlock(listener, droppedBlockInfo1)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
- assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
- postUpdateBlock(listener, droppedBlockInfo2)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- }
-
- test("unpersist RDD") {
- val listener = new StorageStatusListener(conf)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- val blockUpdateInfos1 = Seq(
- BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
- BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
- )
- val blockUpdateInfos2 =
- Seq(BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
- postUpdateBlock(listener, blockUpdateInfos1)
- postUpdateBlock(listener, blockUpdateInfos2)
- assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
-
- // Unpersist RDD
- listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
- assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
- listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
- assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
- listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
- assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- }
-
- private def postUpdateBlock(
- listener: StorageStatusListener, updateBlockInfos: Seq[BlockUpdatedInfo]): Unit = {
- updateBlockInfos.foreach { updateBlockInfo =>
- listener.onBlockUpdated(SparkListenerBlockUpdated(updateBlockInfo))
- }
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 4d8ef8ab97ef4..d39660276442f 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStateStore
-import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 350c174e24742..d4107cdbaf7a2 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -20,39 +20,46 @@ package org.apache.spark.ui.storage
import org.mockito.Mockito._
import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.StreamBlockData
+import org.apache.spark.status.api.v1.RDDStorageInfo
import org.apache.spark.storage._
class StoragePageSuite extends SparkFunSuite {
val storageTab = mock(classOf[StorageTab])
when(storageTab.basePath).thenReturn("http://localhost:4040")
- val storagePage = new StoragePage(storageTab)
+ val storagePage = new StoragePage(storageTab, null)
test("rddTable") {
- val rdd1 = new RDDInfo(1,
+ val rdd1 = new RDDStorageInfo(1,
"rdd1",
10,
- StorageLevel.MEMORY_ONLY,
- Seq.empty)
- rdd1.memSize = 100
- rdd1.numCachedPartitions = 10
+ 10,
+ StorageLevel.MEMORY_ONLY.description,
+ 100L,
+ 0L,
+ None,
+ None)
- val rdd2 = new RDDInfo(2,
+ val rdd2 = new RDDStorageInfo(2,
"rdd2",
10,
- StorageLevel.DISK_ONLY,
- Seq.empty)
- rdd2.diskSize = 200
- rdd2.numCachedPartitions = 5
-
- val rdd3 = new RDDInfo(3,
+ 5,
+ StorageLevel.DISK_ONLY.description,
+ 0L,
+ 200L,
+ None,
+ None)
+
+ val rdd3 = new RDDStorageInfo(3,
"rdd3",
10,
- StorageLevel.MEMORY_AND_DISK_SER,
- Seq.empty)
- rdd3.memSize = 400
- rdd3.diskSize = 500
- rdd3.numCachedPartitions = 10
+ 10,
+ StorageLevel.MEMORY_AND_DISK_SER.description,
+ 400L,
+ 500L,
+ None,
+ None)
val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3))
@@ -90,58 +97,85 @@ class StoragePageSuite extends SparkFunSuite {
}
test("streamBlockStorageLevelDescriptionAndSize") {
- val memoryBlock = BlockUIData(StreamBlockId(0, 0),
+ val memoryBlock = new StreamBlockData("0",
+ "0",
"localhost:1111",
- StorageLevel.MEMORY_ONLY,
- memSize = 100,
- diskSize = 0)
+ StorageLevel.MEMORY_ONLY.description,
+ true,
+ false,
+ true,
+ 100,
+ 0)
assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
- val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
+ val memorySerializedBlock = new StreamBlockData("0",
+ "0",
"localhost:1111",
- StorageLevel.MEMORY_ONLY_SER,
+ StorageLevel.MEMORY_ONLY_SER.description,
+ true,
+ false,
+ false,
memSize = 100,
diskSize = 0)
assert(("Memory Serialized", 100) ===
storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
- val diskBlock = BlockUIData(StreamBlockId(0, 0),
+ val diskBlock = new StreamBlockData("0",
+ "0",
"localhost:1111",
- StorageLevel.DISK_ONLY,
- memSize = 0,
- diskSize = 100)
+ StorageLevel.DISK_ONLY.description,
+ false,
+ true,
+ false,
+ 0,
+ 100)
assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
}
test("receiverBlockTables") {
val blocksForExecutor0 = Seq(
- BlockUIData(StreamBlockId(0, 0),
+ new StreamBlockData(StreamBlockId(0, 0).name,
+ "0",
"localhost:10000",
- StorageLevel.MEMORY_ONLY,
- memSize = 100,
- diskSize = 0),
- BlockUIData(StreamBlockId(1, 1),
+ StorageLevel.MEMORY_ONLY.description,
+ true,
+ false,
+ true,
+ 100,
+ 0),
+ new StreamBlockData(StreamBlockId(1, 1).name,
+ "0",
"localhost:10000",
- StorageLevel.DISK_ONLY,
- memSize = 0,
- diskSize = 100)
+ StorageLevel.DISK_ONLY.description,
+ false,
+ true,
+ false,
+ 0,
+ 100)
)
- val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
val blocksForExecutor1 = Seq(
- BlockUIData(StreamBlockId(0, 0),
+ new StreamBlockData(StreamBlockId(0, 0).name,
+ "1",
"localhost:10001",
- StorageLevel.MEMORY_ONLY,
+ StorageLevel.MEMORY_ONLY.description,
+ true,
+ false,
+ true,
memSize = 100,
diskSize = 0),
- BlockUIData(StreamBlockId(1, 1),
+ new StreamBlockData(StreamBlockId(1, 1).name,
+ "1",
"localhost:10001",
- StorageLevel.MEMORY_ONLY_SER,
- memSize = 100,
- diskSize = 0)
+ StorageLevel.MEMORY_ONLY_SER.description,
+ true,
+ false,
+ false,
+ 100,
+ 0)
)
- val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
- val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
+
+ val xmlNodes = storagePage.receiverBlockTables(blocksForExecutor0 ++ blocksForExecutor1)
val executorTable = (xmlNodes \\ "table")(0)
val executorHeaders = Seq(
@@ -189,8 +223,6 @@ class StoragePageSuite extends SparkFunSuite {
test("empty receiverBlockTables") {
assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
- val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
- val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
- assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)
}
+
}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
deleted file mode 100644
index f6c8418ba3ac4..0000000000000
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.storage
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark._
-import org.apache.spark.scheduler._
-import org.apache.spark.storage._
-
-/**
- * Test various functionality in the StorageListener that supports the StorageTab.
- */
-class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
- private var bus: LiveListenerBus = _
- private var storageStatusListener: StorageStatusListener = _
- private var storageListener: StorageListener = _
- private val memAndDisk = StorageLevel.MEMORY_AND_DISK
- private val memOnly = StorageLevel.MEMORY_ONLY
- private val none = StorageLevel.NONE
- private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
- private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", TaskLocality.ANY, false)
- private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, Seq(10))
- private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, Seq(10))
- private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, Seq(10))
- private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, Seq(10))
- private val bm1 = BlockManagerId("big", "dog", 1)
-
- before {
- val conf = new SparkConf()
- sc = new SparkContext("local", "test", conf)
- bus = new LiveListenerBus(sc)
- storageStatusListener = new StorageStatusListener(conf)
- storageListener = new StorageListener(storageStatusListener)
- bus.addListener(storageStatusListener)
- bus.addListener(storageListener)
- }
-
- test("stage submitted / completed") {
- assert(storageListener._rddInfoMap.isEmpty)
- assert(storageListener.rddInfoList.isEmpty)
-
- // 2 RDDs are known, but none are cached
- val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), Seq.empty, "details")
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
- assert(storageListener._rddInfoMap.size === 2)
- assert(storageListener.rddInfoList.isEmpty)
-
- // 4 RDDs are known, but only 2 are cached
- val rddInfo2Cached = rddInfo2
- val rddInfo3Cached = rddInfo3
- rddInfo2Cached.numCachedPartitions = 1
- rddInfo3Cached.numCachedPartitions = 1
- val stageInfo1 = new StageInfo(
- 1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, "details")
- bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
- assert(storageListener._rddInfoMap.size === 4)
- assert(storageListener.rddInfoList.size === 2)
-
- // Submitting RDDInfos with duplicate IDs does nothing
- val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, Seq(10))
- rddInfo0Cached.numCachedPartitions = 1
- val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details")
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
- assert(storageListener._rddInfoMap.size === 4)
- assert(storageListener.rddInfoList.size === 2)
-
- // We only keep around the RDDs that are cached
- bus.postToAll(SparkListenerStageCompleted(stageInfo0))
- assert(storageListener._rddInfoMap.size === 2)
- assert(storageListener.rddInfoList.size === 2)
- }
-
- test("unpersist") {
- val rddInfo0Cached = rddInfo0
- val rddInfo1Cached = rddInfo1
- rddInfo0Cached.numCachedPartitions = 1
- rddInfo1Cached.numCachedPartitions = 1
- val stageInfo0 = new StageInfo(
- 0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, "details")
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
- assert(storageListener._rddInfoMap.size === 2)
- assert(storageListener.rddInfoList.size === 2)
- bus.postToAll(SparkListenerUnpersistRDD(0))
- assert(storageListener._rddInfoMap.size === 1)
- assert(storageListener.rddInfoList.size === 1)
- bus.postToAll(SparkListenerUnpersistRDD(4)) // doesn't exist
- assert(storageListener._rddInfoMap.size === 1)
- assert(storageListener.rddInfoList.size === 1)
- bus.postToAll(SparkListenerUnpersistRDD(1))
- assert(storageListener._rddInfoMap.size === 0)
- assert(storageListener.rddInfoList.size === 0)
- }
-
- test("block update") {
- val myRddInfo0 = rddInfo0
- val myRddInfo1 = rddInfo1
- val myRddInfo2 = rddInfo2
- val stageInfo0 = new StageInfo(
- 0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, "details")
- bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
- assert(storageListener._rddInfoMap.size === 3)
- assert(storageListener.rddInfoList.size === 0) // not cached
- assert(!storageListener._rddInfoMap(0).isCached)
- assert(!storageListener._rddInfoMap(1).isCached)
- assert(!storageListener._rddInfoMap(2).isCached)
-
- // Some blocks updated
- val blockUpdateInfos = Seq(
- BlockUpdatedInfo(bm1, RDDBlockId(0, 100), memAndDisk, 400L, 0L),
- BlockUpdatedInfo(bm1, RDDBlockId(0, 101), memAndDisk, 0L, 400L),
- BlockUpdatedInfo(bm1, RDDBlockId(1, 20), memAndDisk, 0L, 240L)
- )
- postUpdateBlocks(bus, blockUpdateInfos)
- assert(storageListener._rddInfoMap(0).memSize === 400L)
- assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
- assert(storageListener._rddInfoMap(0).isCached)
- assert(storageListener._rddInfoMap(1).memSize === 0L)
- assert(storageListener._rddInfoMap(1).diskSize === 240L)
- assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
- assert(storageListener._rddInfoMap(1).isCached)
- assert(!storageListener._rddInfoMap(2).isCached)
- assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
-
- // Drop some blocks
- val blockUpdateInfos2 = Seq(
- BlockUpdatedInfo(bm1, RDDBlockId(0, 100), none, 0L, 0L),
- BlockUpdatedInfo(bm1, RDDBlockId(1, 20), none, 0L, 0L),
- BlockUpdatedInfo(bm1, RDDBlockId(2, 40), none, 0L, 0L), // doesn't actually exist
- BlockUpdatedInfo(bm1, RDDBlockId(4, 80), none, 0L, 0L) // doesn't actually exist
- )
- postUpdateBlocks(bus, blockUpdateInfos2)
- assert(storageListener._rddInfoMap(0).memSize === 0L)
- assert(storageListener._rddInfoMap(0).diskSize === 400L)
- assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
- assert(storageListener._rddInfoMap(0).isCached)
- assert(!storageListener._rddInfoMap(1).isCached)
- assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
- assert(!storageListener._rddInfoMap(2).isCached)
- assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
- }
-
- test("verify StorageTab contains all cached rdds") {
-
- val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, Seq(4))
- val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
- val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
- val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
- val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
- val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L))
- bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
- assert(storageListener.rddInfoList.size === 0)
- postUpdateBlocks(bus, blockUpdateInfos1)
- assert(storageListener.rddInfoList.size === 1)
- bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
- assert(storageListener.rddInfoList.size === 1)
- bus.postToAll(SparkListenerStageCompleted(stageInfo0))
- assert(storageListener.rddInfoList.size === 1)
- postUpdateBlocks(bus, blockUpdateInfos2)
- assert(storageListener.rddInfoList.size === 2)
- bus.postToAll(SparkListenerStageCompleted(stageInfo1))
- assert(storageListener.rddInfoList.size === 2)
- }
-
- test("verify StorageTab still contains a renamed RDD") {
- val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4))
- val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details")
- bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
- bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
- val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
- postUpdateBlocks(bus, blockUpdateInfos1)
- assert(storageListener.rddInfoList.size == 1)
-
- val newName = "new_name"
- val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4))
- val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details")
- bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
- assert(storageListener.rddInfoList.size == 1)
- assert(storageListener.rddInfoList.head.name == newName)
- }
-
- private def postUpdateBlocks(
- bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
- blockUpdateInfos.foreach { blockUpdateInfo =>
- bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo))
- }
- }
-}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4bd6c3e206e8e..4a6854666a58b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,6 +41,8 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.env.EnvironmentListener"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.exec.ExecutorsListener"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.storage.StorageListener"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.StorageStatusListener"),
// [SPARK-20495][SQL] Add StorageLevel to cacheTable API
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable")
)