Skip to content

Commit

Permalink
First cut implementation of Streaming UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 28, 2014
1 parent 6f986f0 commit 56cc7fb
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.ui.StreamingUI

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -158,6 +159,9 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

private[streaming] val ui = new StreamingUI(this)
ui.bind()

/**
* Return the associated Spark context
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.ui

import scala.collection.mutable.SynchronizedQueue
import scala.xml.Node

import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListener, StreamingListenerBatchCompleted}
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{Distribution, Utils}

private[spark] class StreamingUIListener() extends StreamingListener {

private val batchInfos = new SynchronizedQueue[BatchInfo]
private val maxBatchInfos = 100

override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
batchInfos.enqueue(batchStarted.batchInfo)
if (batchInfos.size > maxBatchInfos) batchInfos.dequeue()
}

def processingDelayDistribution = extractDistribution(_.processingDelay)

def schedulingDelayDistribution = extractDistribution(_.schedulingDelay)

def totalDelay = extractDistribution(_.totalDelay)

def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
}

def numBatchInfos = batchInfos.size
}

private[spark] class StreamingUI(ssc: StreamingContext) extends Logging {

private val sc = ssc.sparkContext
private val conf = sc.conf
private val appName = sc.appName
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT)
private val securityManager = sc.env.securityManager
private val listener = new StreamingUIListener()
private val handlers: Seq[ServletContextHandler] = {
Seq(
createServletHandler("/",
(request: HttpServletRequest) => render(request), securityManager),
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")
)
}

private var serverInfo: Option[ServerInfo] = None

ssc.addStreamingListener(listener)

def bind() {
try {
serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf))
logInfo("Started Spark Streaming Web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
System.exit(1)
}
}

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

private def render(request: HttpServletRequest): Seq[Node] = {
val batchStatsTable = generateBatchStatsTable()
val content = batchStatsTable
UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview")
}

private def generateBatchStatsTable(): Seq[Node] = {
def getQuantiles(timeDistributionOption: Option[Distribution]) = {
timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) }
}
val numBatches = listener.numBatchInfos
val table = if (numBatches > 0) {
val processingDelayQuantilesRow =
"Processing Times" +: getQuantiles(listener.processingDelayDistribution)
val schedulingDelayQuantilesRow =
"Scheduling Delay:" +: getQuantiles(listener.processingDelayDistribution)
val totalDelayQuantilesRow =
"End-to-end Delay:" +: getQuantiles(listener.totalDelay)

val headerRow = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
val dataRows: Seq[Seq[String]] = Seq(
processingDelayQuantilesRow,
schedulingDelayQuantilesRow,
totalDelayQuantilesRow
)
Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
} else {
None
}

val content =
<h4>Batch Processing Statistics</h4> ++
<div>{table.getOrElse("No statistics have been generated yet.")}</div>
content
}
}

object StreamingUI {
val DEFAULT_PORT = 6060
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.spark.streaming.ui

import scala.xml.Node
import org.apache.spark.ui.Page

private[spark] object UIUtils {

import org.apache.spark.ui.UIUtils.prependBaseUri

def headerStreamingPage(
content: => Seq[Node],
basePath: String,
appName: String,
title: String): Seq[Node] = {
val overview = {
<li><a href={prependBaseUri(basePath)}>Overview</a></li>
}

<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
<link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")}
type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{appName} - {title}</title>
</head>
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri(basePath, "/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
<ul class="nav">
{overview}
</ul>
<p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
</div>
</div>

<div class="container-fluid">
<div class="row-fluid">
<div class="span12">
<h3 style="vertical-align: bottom; display: inline-block;">
{title}
</h3>
</div>
</div>
{content}
</div>
</body>
</html>
}

def listingTable[T](
headers: Seq[String],
makeRow: T => Seq[Node],
rows: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth)
}

def listingTable[T](
headers: Seq[String],
rows: Seq[Seq[String]],
fixedWidth: Boolean = false
): Seq[Node] = {
def makeRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
org.apache.spark.ui.UIUtils.listingTable(headers, makeRow, rows, fixedWidth)
}
}

0 comments on commit 56cc7fb

Please sign in to comment.