Skip to content

Commit

Permalink
Added basic stats to the StreamingUI and refactored the UI to a Page …
Browse files Browse the repository at this point in the history
…to make it easier to transition to using SparkUI later.
  • Loading branch information
tdas committed Apr 1, 2014
1 parent 93f1c69 commit 4d86e98
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 50 deletions.
186 changes: 138 additions & 48 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ import org.apache.spark.streaming.scheduler._
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{Distribution, Utils}
import java.util.Locale
import java.util.{Calendar, Locale}

private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener {
private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingListener {

private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100)
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
private var totalBatchesCompleted = 0L

val batchDuration = ssc.graph.batchDuration.milliseconds

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
Expand All @@ -52,6 +55,11 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalBatchesCompleted += 1L
}

def numTotalBatchesCompleted: Long = synchronized {
totalBatchesCompleted
}

def numNetworkReceivers: Int = synchronized {
Expand Down Expand Up @@ -89,7 +97,8 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble))
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
}.toMap
}
Expand All @@ -99,44 +108,42 @@ private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListe
}
}

private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {

private val sc = ssc.sparkContext
private val conf = sc.conf
private val appName = sc.appName
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
private val securityManager = sc.env.securityManager
private val listener = new StreamingUIListener(conf)
private val handlers: Seq[ServletContextHandler] = {
Seq(
createServletHandler("/",
(request: HttpServletRequest) => render(request), securityManager),
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
)
}
private[ui] class StreamingPage(parent: StreamingUI) extends Logging {

private var serverInfo: Option[ServerInfo] = None
private val listener = parent.listener
private val calendar = Calendar.getInstance()
private val startTime = calendar.getTime()

ssc.addStreamingListener(listener)

def bind() {
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
System.exit(1)
}
}
def render(request: HttpServletRequest): Seq[Node] = {

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
val content =
generateBasicStats() ++
<h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
generateNetworkStatsTable() ++
generateBatchStatsTable()
UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
}

private def render(request: HttpServletRequest): Seq[Node] = {
val content = generateBatchStatsTable() ++ generateNetworkStatsTable()
UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview")
private def generateBasicStats(): Seq[Node] = {

val timeSinceStart = System.currentTimeMillis() - startTime.getTime
<ul class ="unstyled">
<li>
<strong>Started at: </strong> {startTime.toString}
</li>
<li>
<strong>Time since start: </strong>{msDurationToString(timeSinceStart)}
</li>
<li>
<strong>Batch interval: </strong>{msDurationToString(listener.batchDuration)}
</li>
<li>
<strong>Processed batches: </strong>{listener.numTotalBatchesCompleted}
</li>
<li></li>
</ul>
}

private def generateBatchStatsTable(): Seq[Node] = {
Expand Down Expand Up @@ -173,18 +180,12 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {

val batchStats =
<ul class="unstyled">
<li>
<h5>Statistics over last {numBatches} processed batches</h5>
</li>
<li>
{table.getOrElse("No statistics have been generated yet.")}
</li>
{table.getOrElse("No statistics have been generated yet.")}
</ul>

val content =
<h4>Batch Processing Statistics</h4> ++
<div>{batchCounts}</div> ++
<div>{batchStats}</div>
<h5>Batch Processing Statistics</h5> ++
<div>{batchStats}</div>

content
}
Expand All @@ -198,7 +199,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
val dataRows = (0 until numNetworkReceivers).map { receiverId =>
val receiverName = s"Receiver-$receiverId"
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch")
d.getQuantiles().map(r => numberToString(r.toLong) + " records/second")
}.getOrElse {
Seq("-", "-", "-", "-", "-")
}
Expand All @@ -210,8 +211,8 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
}

val content =
<h4>Network Input Statistics</h4> ++
<div>{table.getOrElse("No network receivers")}</div>
<h5>Network Input Statistics</h5> ++
<div>{table.getOrElse("No network receivers")}</div>

content
}
Expand Down Expand Up @@ -241,6 +242,95 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {
}
"%.1f%s".formatLocal(Locale.US, value, unit)
}

/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
private def msDurationToString(ms: Long): String = {
try {
val second = 1000L
val minute = 60 * second
val hour = 60 * minute
val day = 24 * hour
val week = 7 * day
val year = 365 * day

def toString(num: Long, unit: String): String = {
if (num == 0) {
""
} else if (num == 1) {
s"$num $unit"
} else {
s"$num ${unit}s"
}
}

val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms"
val secondString = toString((ms % minute) / second, "second")
val minuteString = toString((ms % hour) / minute, "minute")
val hourString = toString((ms % day) / hour, "hour")
val dayString = toString((ms % week) / day, "day")
val weekString = toString((ms % year) / week, "week")
val yearString = toString(ms / year, "year")

Seq(
second -> millisecondsString,
minute -> s"$secondString $millisecondsString",
hour -> s"$minuteString $secondString",
day -> s"$hourString $minuteString $secondString",
week -> s"$dayString $hourString $minuteString",
year -> s"$weekString $dayString $hourString"
).foreach {
case (durationLimit, durationString) if (ms < durationLimit) =>
return durationString
case e: Any => // matcherror is thrown without this
}
return s"$yearString $weekString $dayString"
} catch {
case e: Exception =>
logError("Error converting time to string", e)
return ""
}
}
}


private[spark] class StreamingUI(val ssc: StreamingContext) extends Logging {

val sc = ssc.sparkContext
val conf = sc.conf
val appName = sc.appName
val listener = new StreamingUIListener(ssc)
val overviewPage = new StreamingPage(this)

private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
private val securityManager = sc.env.securityManager
private val handlers: Seq[ServletContextHandler] = {
Seq(
createServletHandler("/",
(request: HttpServletRequest) => overviewPage.render(request), securityManager),
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
)
}

private var serverInfo: Option[ServerInfo] = None

ssc.addStreamingListener(listener)

def bind() {
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
System.exit(1)
}
}

private def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
}

object StreamingUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ private[spark] object UIUtils {
type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{appName} - {title}</title>
<script type="text/JavaScript">
<!--
function timedRefresh(timeoutPeriod) {
setTimeout("location.reload(true);",timeoutPeriod);
}
// -->
</script>
</head>
<body>
<body onload="JavaScript:timedRefresh(1000);">
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri(basePath, "/")} class="brand">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class UISuite extends FunSuite with BeforeAndAfterAll {
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < duration) {
servers.map(_.send(Random.nextString(10) + "\n"))
Thread.sleep(1)
//Thread.sleep(1)
}
ssc.stop()
servers.foreach(_.stop())
Expand Down

0 comments on commit 4d86e98

Please sign in to comment.