Skip to content

Commit

Permalink
Added last batch processing time to StreamingUI.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 1, 2014
1 parent 4d86e98 commit db27bad
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.util.Utils

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand Down Expand Up @@ -206,7 +207,9 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
val timeout = 5.seconds

override def preStart() {
val future = tracker.ask(RegisterReceiver(streamId, self))(timeout)
val msg = RegisterReceiver(
streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
val future = tracker.ask(msg)(timeout)
Await.result(future, timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils

/** Information about block received by the network receiver */
/** Information about receiver */
case class ReceiverInfo(streamId: Int, typ: String, location: String) {
override def toString = s"$typ-$streamId"
}

/** Information about blocks received by the network receiver */
case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
Expand All @@ -41,8 +46,12 @@ case class ReceivedBlockInfo(
* with each other.
*/
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
extends NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
host: String,
receiverActor: ActorRef
) extends NetworkInputTrackerMessage
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
Expand Down Expand Up @@ -108,11 +117,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
/** Actor to receive messages from the receivers. */
private class NetworkInputTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
case RegisterReceiver(streamId, typ, host, receiverActor) => {
if (!networkInputStreamMap.contains(streamId)) {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
ReceiverInfo(streamId, typ, host)
))
logInfo("Registered receiver for network stream " + streamId + " from "
+ sender.path.address)
sender ! true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ sealed trait StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent

/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
Expand All @@ -36,6 +38,9 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
*/
trait StreamingListener {

/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
while (true) {
val event = eventQueue.take
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listeners.foreach(_.onReceiverStarted(receiverStarted))
case batchSubmitted: StreamingListenerBatchSubmitted =>
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
case batchStarted: StreamingListenerBatchStarted =>
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
Expand Down
185 changes: 118 additions & 67 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
private var totalBatchesCompleted = 0L
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

val batchDuration = ssc.graph.batchDuration.milliseconds

override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
synchronized {
receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
}
}

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
}
Expand All @@ -55,15 +62,19 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalBatchesCompleted += 1L
totalCompletedBatches += 1L
}

def numTotalBatchesCompleted: Long = synchronized {
totalBatchesCompleted
def numNetworkReceivers = synchronized {
ssc.graph.getNetworkInputStreams().size
}

def numNetworkReceivers: Int = synchronized {
completedaBatchInfos.headOption.map(_.receivedBlockInfo.size).getOrElse(0)
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}

def waitingBatches: Seq[BatchInfo] = synchronized {
Expand Down Expand Up @@ -91,9 +102,7 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
}

def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val allBatcheInfos = waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedaBatchInfos
val latestBatchInfos = allBatcheInfos.sortBy(_.batchTime)(Time.ordering).reverse.take(batchInfoLimit)
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
Expand All @@ -103,6 +112,34 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
}.toMap
}

def lastReceivedBatchRecords: Map[Int, Long] = {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numNetworkReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
}.toMap
}.getOrElse {
(0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
}
}

def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
receiverInfos.get(receiverId)
}

def lastCompletedBatch: Option[BatchInfo] = {
completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}

def lastReceivedBatch: Option[BatchInfo] = {
allBatches.lastOption
}

private def allBatches: Seq[BatchInfo] = synchronized {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
}

private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
Expand All @@ -114,13 +151,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
private val listener = parent.listener
private val calendar = Calendar.getInstance()
private val startTime = calendar.getTime()

private val emptyCellTest = "-"

def render(request: HttpServletRequest): Seq[Node] = {

val content =
generateBasicStats() ++
<h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
<br></br><h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
generateNetworkStatsTable() ++
generateBatchStatsTable()
UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
Expand All @@ -136,28 +173,76 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
<li>
<strong>Time since start: </strong>{msDurationToString(timeSinceStart)}
</li>
<li>
<strong>Network receivers: </strong>{listener.numNetworkReceivers}
</li>
<li>
<strong>Batch interval: </strong>{msDurationToString(listener.batchDuration)}
</li>
<li>
<strong>Processed batches: </strong>{listener.numTotalBatchesCompleted}
<strong>Processed batches: </strong>{listener.numTotalCompletedBatches}
</li>
<li>
<strong>Waiting batches: </strong>{listener.numUnprocessedBatches}
</li>
<li></li>
</ul>
}

private def generateNetworkStatsTable(): Seq[Node] = {
val receivedRecordDistributions = listener.receivedRecordsDistributions
val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
val table = if (receivedRecordDistributions.size > 0) {
val headerRow = Seq(
"Receiver",
"Location",
s"Records in last batch",
"Minimum rate [records/sec]",
"25th percentile rate [records/sec]",
"Median rate [records/sec]",
"75th percentile rate [records/sec]",
"Maximum rate [records/sec]"
)
val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest)
val receiverLastBatchRecords = numberToString(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
d.getQuantiles().map(r => numberToString(r.toLong))
}.getOrElse {
Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest)
}
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++
receivedRecordStats
}
Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
} else {
None
}

val content =
<h5>Network Input Statistics</h5> ++
<div>{table.getOrElse("No network receivers")}</div>

content
}

private def generateBatchStatsTable(): Seq[Node] = {
val numBatches = listener.completedBatches.size
val lastCompletedBatch = listener.lastCompletedBatch
val table = if (numBatches > 0) {
val processingDelayQuantilesRow =
"Processing Times" +: getQuantiles(listener.processingDelayDistribution)
Seq("Processing Time", msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++
getQuantiles(listener.processingDelayDistribution)
val schedulingDelayQuantilesRow =
"Scheduling Delay:" +: getQuantiles(listener.schedulingDelayDistribution)
Seq("Scheduling Delay", msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++
getQuantiles(listener.schedulingDelayDistribution)
val totalDelayQuantilesRow =
"End-to-end Delay:" +: getQuantiles(listener.totalDelayDistribution)
Seq("Total Delay", msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++
getQuantiles(listener.totalDelayDistribution)

val headerRow = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
"Median", "75th percentile", "Maximum")
val dataRows: Seq[Seq[String]] = Seq(
processingDelayQuantilesRow,
schedulingDelayQuantilesRow,
Expand All @@ -168,57 +253,19 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
None
}

val batchCounts =
<ul class="unstyled">
<li>
# batches being processed: {listener.runningBatches.size}
</li>
<li>
# scheduled batches: {listener.waitingBatches.size}
</li>
</ul>

val batchStats =
<ul class="unstyled">
{table.getOrElse("No statistics have been generated yet.")}
</ul>

val content =
<h5>Batch Processing Statistics</h5> ++
<div>{batchStats}</div>

content
}

private def generateNetworkStatsTable(): Seq[Node] = {
val receivedRecordDistributions = listener.receivedRecordsDistributions
val numNetworkReceivers = receivedRecordDistributions.size
val table = if (receivedRecordDistributions.size > 0) {
val headerRow = Seq("Receiver", "Min", "25th percentile",
"Median", "75th percentile", "Max")
val dataRows = (0 until numNetworkReceivers).map { receiverId =>
val receiverName = s"Receiver-$receiverId"
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
d.getQuantiles().map(r => numberToString(r.toLong) + " records/second")
}.getOrElse {
Seq("-", "-", "-", "-", "-")
}
receiverName +: receivedRecordStats
}
Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
} else {
None
}

val content =
<h5>Network Input Statistics</h5> ++
<div>{table.getOrElse("No network receivers")}</div>
<div>
<ul class="unstyled">
{table.getOrElse("No statistics have been generated yet.")}
</ul>
</div>

content
}

private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) }
timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) }
}

private def numberToString(records: Double): String = {
Expand All @@ -229,13 +276,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {

val (value, unit) = {
if (records >= 2*trillion) {
(records / trillion, "T")
(records / trillion, " T")
} else if (records >= 2*billion) {
(records / billion, "B")
(records / billion, " B")
} else if (records >= 2*million) {
(records / million, "M")
(records / million, " M")
} else if (records >= 2*thousand) {
(records / thousand, "K")
(records / thousand, " K")
} else {
(records, "")
}
Expand Down Expand Up @@ -265,7 +312,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
}
}

val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms"
val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
val secondString = toString((ms % minute) / second, "second")
val minuteString = toString((ms % hour) / minute, "minute")
val hourString = toString((ms % day) / hour, "hour")
Expand All @@ -292,6 +339,10 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
return ""
}
}

private def msDurationToString(msOption: Option[Long]): String = {
msOption.map(msDurationToString).getOrElse(emptyCellTest)
}
}


Expand Down

0 comments on commit db27bad

Please sign in to comment.