Skip to content

Commit

Permalink
SHS-NG M4.3: Port StorageTab to the new backend.
Browse files Browse the repository at this point in the history
This required adding information about StreamBlockId to the UI store,
which is not available yet via the API. So an internal type was added
until there's a need to expose that information in the API.

The UI only lists RDDs that have cached partitions, and that information
wasn't being correctly captured in UIListener, so that's also fixed,
along with some minor (internal) API adjustments so that the UI can
get the correct data.
  • Loading branch information
Marcelo Vanzin committed May 1, 2017
1 parent 5505c83 commit 6d8e1b2
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 948 deletions.
52 changes: 39 additions & 13 deletions core/src/main/scala/org/apache/spark/status/AppStateListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too.
case stream: StreamBlockId => updateStreamBlock(event, stream)
case _ =>
}
}

Expand Down Expand Up @@ -694,7 +695,11 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
newRDDDataDistribution(
None,
address = address,
memoryRemaining = executorInfo.info.maxMemory)
memoryRemaining = executorInfo.info.maxMemory,
onHeapMemoryUsed = memMetrics.map { _ => 0L },
offHeapMemoryUsed = memMetrics.map { _ => 0L },
onHeapMemoryRemaining = memMetrics.map(_.totalOnHeapStorageMemory),
offHeapMemoryRemaining = memMetrics.map(_.totalOffHeapStorageMemory))
}
(_old, others)
}
Expand All @@ -703,19 +708,18 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
val newDistMem = newValue(oldDist.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult)
val newDistDisk = newValue(oldDist.diskUsed, event.blockUpdatedInfo.diskSize * diskMult)
val newDists = if (newDistMem > 0 || newDistDisk > 0) {
val newOffHeap = if (storageLevel.useOffHeap) Some(newDistMem) else None
val newOnHeap = if (!storageLevel.useOffHeap) Some(newDistMem) else None
val remainingOffHeap = if (storageLevel.useOffHeap) {
remainingMemory(memMetrics.map(_.totalOffHeapStorageMemory), oldDist.offHeapMemoryUsed,
newOffHeap)
val _newMem = Some(newDistMem)
val (newOffHeap, remainingOffHeap) = if (storageLevel.useOffHeap) {
(_newMem, remainingMemory(oldDist.offHeapMemoryRemaining, oldDist.offHeapMemoryUsed,
_newMem))
} else {
None
(oldDist.offHeapMemoryUsed, oldDist.offHeapMemoryRemaining)
}
val remainingOnHeap = if (!storageLevel.useOffHeap) {
remainingMemory(memMetrics.map(_.totalOnHeapStorageMemory), oldDist.onHeapMemoryUsed,
newOnHeap)
val (newOnHeap, remainingOnHeap) = if (!storageLevel.useOffHeap) {
(_newMem, remainingMemory(oldDist.onHeapMemoryRemaining, oldDist.onHeapMemoryUsed,
_newMem))
} else {
None
(oldDist.onHeapMemoryUsed, oldDist.onHeapMemoryRemaining)
}

val newDist = newRDDDataDistribution(
Expand All @@ -733,13 +737,15 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
Nil
}

val allBlocks = others ++ newBlocks
val newRDD = newRDDStorageInfo(
rdd.info,
numCachedPartitions = allBlocks.size,
storageLevel = updatedStorageLevel,
memoryUsed = newValue(rdd.info.memoryUsed, event.blockUpdatedInfo.memSize * memoryMult),
diskUsed = newValue(rdd.info.diskUsed, event.blockUpdatedInfo.diskSize * diskMult),
dataDistribution = Some(otherDists ++ newDists),
partitions = Some(others ++ newBlocks))
partitions = Some(allBlocks))
new RDDStorageInfoWrapper(newRDD)
}

Expand Down Expand Up @@ -771,6 +777,26 @@ private class AppStateListener(override protected val kvstore: KVStore) extends
kvstore.write(new ExecutorSummaryWrapper(newExecSummary))
}

private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
val storageLevel = event.blockUpdatedInfo.storageLevel
if (storageLevel.isValid) {
val data = new StreamBlockData(
stream.name,
event.blockUpdatedInfo.blockManagerId.executorId,
event.blockUpdatedInfo.blockManagerId.hostPort,
storageLevel.description,
storageLevel.useMemory,
storageLevel.useDisk,
storageLevel.deserialized,
event.blockUpdatedInfo.memSize,
event.blockUpdatedInfo.diskSize)
kvstore.write(data)
} else {
kvstore.delete(classOf[StreamBlockData],
Array(stream.name, event.blockUpdatedInfo.blockManagerId.executorId))
}
}

private def updateJobData(id: Int)(fn: JobDataWrapper => JobDataWrapper): Unit = {
update[JobDataWrapper](id) { old =>
val job = old.getOrElse((newJobDataWrapper(None, newJobData(None))))
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStateStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
}
}

def rddList(): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).toSeq
def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).filter { rdd =>
!cachedOnly || rdd.numCachedPartitions > 0
}.toSeq
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand All @@ -238,6 +240,10 @@ private[spark] class AppStateStore private (store: KVStore, tempStorePath: Optio
store.view(classOf[ExecutorEventData]).asScala.map(_.event).toSeq
}

def streamBlocksList(): Seq[StreamBlockData] = {
store.view(classOf[StreamBlockData]).asScala.toSeq
}

def close(): Unit = {
store.close()
tempStorePath.foreach(Utils.deleteRecursively)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,90 +21,11 @@ import javax.ws.rs.core.MediaType

import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.storage.StorageListener

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class AllRDDResource(ui: SparkUI) {

@GET
def rddList(): Seq[RDDStorageInfo] = {
val storageStatusList = ui.storageListener.activeStorageStatusList
val rddInfos = ui.storageListener.rddInfoList
rddInfos.map{rddInfo =>
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
includeDetails = false)
}
}
def rddList(): Seq[RDDStorageInfo] = ui.store.rddList()

}

private[spark] object AllRDDResource {

def getRDDStorageInfo(
rddId: Int,
listener: StorageListener,
includeDetails: Boolean): Option[RDDStorageInfo] = {
val storageStatusList = listener.activeStorageStatusList
listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
}
}

def getRDDStorageInfo(
rddId: Int,
rddInfo: RDDInfo,
storageStatusList: Seq[StorageStatus],
includeDetails: Boolean): RDDStorageInfo = {
val workers = storageStatusList.map { (rddId, _) }
val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
val blocks = storageStatusList
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
(blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
}

val dataDistribution = if (includeDetails) {
Some(storageStatusList.map { status =>
new RDDDataDistribution(
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
diskUsed = status.diskUsedByRdd(rddId),
onHeapMemoryUsed = Some(
if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
offHeapMemoryUsed = Some(
if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
onHeapMemoryRemaining = status.onHeapMemRemaining,
offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
}
val partitions = if (includeDetails) {
Some(blocks.map { case (id, block, locations) =>
new RDDPartitionInfo(
blockName = id.name,
storageLevel = block.storageLevel.description,
memoryUsed = block.memSize,
diskUsed = block.diskSize,
executors = locations
)
} )
} else {
None
}

new RDDStorageInfo(
id = rddId,
name = rddInfo.name,
numPartitions = rddInfo.numPartitions,
numCachedPartitions = rddInfo.numCachedPartitions,
storageLevel = rddInfo.storageLevel.description,
memoryUsed = rddInfo.memSize,
diskUsed = rddInfo.diskSize,
dataDistribution = dataDistribution,
partitions = partitions
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.status.api.v1

import java.util.NoSuchElementException
import javax.ws.rs.{GET, PathParam, Produces}
import javax.ws.rs.core.MediaType

Expand All @@ -26,9 +27,12 @@ private[v1] class OneRDDResource(ui: SparkUI) {

@GET
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
throw new NotFoundException(s"no rdd found w/ id $rddId")
)
try {
ui.store.rdd(rddId)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException(s"no rdd found w/ id $rddId")
}
}

}
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,19 @@ private[spark] class ExecutorStageSummaryWrapper(
private[spark] class ExecutorEventData(
@KVIndexParam val id: Long,
val event: SparkListenerEvent)

private[spark] class StreamBlockData(
val name: String,
val executorId: String,
val hostPort: String,
val storageLevel: String,
val useMemory: Boolean,
val useDisk: Boolean,
val deserialized: Boolean,
val memSize: Long,
val diskSize: Long) {

@JsonIgnore @KVIndex
def key: Array[String] = Array(name, executorId)

}
100 changes: 0 additions & 100 deletions core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala

This file was deleted.

Loading

0 comments on commit 6d8e1b2

Please sign in to comment.