Skip to content

Commit

Permalink
Added Spark and Streaming UI unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 11, 2014
1 parent caa5e05 commit f8e1053
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 110 deletions.
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
pages += page
}

/** Initialize this tab and attach all relevant pages. */
def initialize()

/** Get a list of header tabs from the parent UI. */
def headerTabs: Seq[WebUITab] = parent.getTabs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi
val basePath = parent.basePath
val listener = new EnvironmentListener

initialize()

def initialize() {
attachPage(new IndexPage(this))
parent.registerListener(listener)
}
attachPage(new IndexPage(this))
parent.registerListener(listener)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
val basePath = parent.basePath
val listener = new ExecutorsListener(parent.storageStatusListener)

initialize()

def initialize() {
attachPage(new IndexPage(this))
parent.registerListener(listener)
}
attachPage(new IndexPage(this))
parent.registerListener(listener)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stag
val conf = if (live) sc.conf else new SparkConf
val listener = new JobProgressListener(conf)

initialize()

def initialize() {
attachPage(new IndexPage(this))
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
parent.registerListener(listener)
}
attachPage(new IndexPage(this))
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
parent.registerListener(listener)

def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ private[ui] class BlockManagerTab(parent: SparkUI) extends WebUITab(parent, "sto
val basePath = parent.basePath
val listener = new BlockManagerListener(parent.storageStatusListener)

initialize()

def initialize() {
attachPage(new IndexPage(this))
attachPage(new RddPage(this))
parent.registerListener(listener)
}
attachPage(new IndexPage(this))
attachPage(new RddPage(this))
parent.registerListener(listener)
}

/**
Expand Down
67 changes: 66 additions & 1 deletion core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,81 @@
package org.apache.spark.ui

import java.net.ServerSocket
import javax.servlet.http.HttpServletRequest

import scala.io.Source
import scala.util.{Failure, Success, Try}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.LocalSparkContext._
import scala.xml.Node

class UISuite extends FunSuite {

test("basic ui visibility") {
withSpark(new SparkContext("local", "test")) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
assert(html.toLowerCase.contains("environment"))
assert(html.toLowerCase.contains("executors"))
}
}
}

test("visibility at localhost:4040") {
withSpark(new SparkContext("local", "test")) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
assert(html.toLowerCase.contains("stages"))
}
}
}

test("attaching a new tab") {
withSpark(new SparkContext("local", "test")) { sc =>
val sparkUI = sc.ui

val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
override def render(request: HttpServletRequest): Seq[Node] = {
<b>"html magic"</b>
}
})
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))

// check whether new page exists
assert(html.toLowerCase.contains("foo"))

// check whether other pages still exist
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
assert(html.toLowerCase.contains("environment"))
assert(html.toLowerCase.contains("executors"))
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
// check whether new page exists
assert(html.contains("magic"))
}
}
}

test("jetty port increases under contention") {
val startPort = 4040
val server = new Server(startPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.streaming.ui

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.WebUITab
Expand All @@ -32,11 +30,7 @@ private[spark] class StreamingTab(ssc: StreamingContext)
val basePath = parent.basePath
val listener = new StreamingJobProgressListener(ssc)

initialize()

def initialize() {
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}
84 changes: 14 additions & 70 deletions streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,86 +17,30 @@

package org.apache.spark.streaming

import scala.reflect.ClassTag
import scala.util.Random
import scala.io.Source

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream

class UISuite extends FunSuite with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfter {
var sc: SparkContext = null
var ssc: StreamingContext = null

override def beforeAll() {
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
conf.set("spark.cleaner.ttl", "1800")
sc = new SparkContext(conf)
}

override def afterAll() {
if (sc != null) sc.stop()
}

before {
ssc = new StreamingContext(sc, Seconds(1))
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
}
class UISuite extends FunSuite {

test("streaming tab in spark UI") {
val ssc = new StreamingContext(sc, Seconds(1))
val ssc = new StreamingContext("local", "test", Seconds(1))
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val uiData = Source.fromURL(
ssc.sparkContext.ui.appUIAddress).mkString
assert(!uiData.contains("random data that should not be present"))
assert(uiData.contains("streaming"))
val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
// test if other Spark tabs still exist
assert(html.toLowerCase.contains("stages"))
}
}

ignore("Testing") {
runStreaming(1000000)
}

def runStreaming(duration: Long) {
val ssc1 = new StreamingContext(sc, Seconds(1))
val servers1 = (1 to 3).map { i => new TestServer(10000 + i) }

val inputStream1 = ssc1.union(servers1.map(server => ssc1.socketTextStream("localhost", server.port)))
inputStream1.count.print

ssc1.start()
servers1.foreach(_.start())

val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < duration) {
servers1.map(_.send(Random.nextString(10) + "\n"))
//Thread.sleep(1)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(
ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}
ssc1.stop()
servers1.foreach(_.stop())
}
}

class FunctionBasedInputDStream[T: ClassTag](
ssc_ : StreamingContext,
function: (StreamingContext, Time) => Option[RDD[T]]
) extends InputDStream[T](ssc_) {

def start(): Unit = {}

def stop(): Unit = {}

def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime)
}

0 comments on commit f8e1053

Please sign in to comment.