Skip to content

Commit

Permalink
Group RDDs by DStream operations and batches
Browse files Browse the repository at this point in the history
This commit does two main things:

(1) We now display the batch information in each cluster on the UI
(2) Scopes are marked by the DStream operation and the batch

For (2), scopes previously only took into account the batch in
a particular DStream. This means the scopes are never shared
across multiple DStreams, and so we end up with many more boxes
(one for each DStream) per batch than is needed.
  • Loading branch information
Andrew Or committed May 14, 2015
1 parent bf0ab6e commit 7c4513d
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 30 deletions.

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.SparkContext
import org.apache.spark.{Logging, SparkContext}

/**
* A general, named code block representing an operation that instantiates RDDs.
Expand All @@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
val parent: Option[RDDOperationScope] = None) extends Serializable {

val id: Int = RDDOperationScope.nextScopeId()
val id: String = RDDOperationScope.nextScopeId().toString,
val parent: Option[RDDOperationScope] = None) {

def toJson: String = {
RDDOperationScope.jsonMapper.writeValueAsString(this)
Expand Down Expand Up @@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object RDDOperationScope {
private[spark] object RDDOperationScope extends Logging {
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val scopeCounter = new AtomicInteger(0)

Expand All @@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the name of the method that immediately encloses this one.
* The name of the scope will be the first method name in the stack trace that is not the
* same as this method's.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}

Expand Down Expand Up @@ -129,7 +139,7 @@ private[spark] object RDDOperationScope {
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, parent = oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ private[ui] object RDDOperationGraph extends Logging {
// which may be nested inside of other clusters
val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
val rddClusters = rddScopes.map { scope =>
val clusterId = scope.name + "_" + scope.id
val clusterName = scope.name
val clusterId = scope.id
val clusterName = scope.name.replaceAll("\\n", "\\\\n")
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
}
// Build the cluster hierarchy for this RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}

/**
Expand Down Expand Up @@ -112,15 +113,40 @@ abstract class DStream[T: ClassTag] (
private[streaming] val creationSite = DStream.getCreationSite()

/**
* The scope associated with the operation that created this DStream.
* The base scope associated with the operation that created this DStream.
*
* This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey)
* to the RDDs created by this DStream. Note that we never directly use this scope in RDDs.
* Instead, every time we call `compute` we instantiate a new scope using the same name as this
* one. Otherwise, all RDDs ever created by this DStream will be in the same scope.
* to the RDDs created by this DStream. Note that we never use this scope directly in RDDs.
* Instead, we instantiate a new scope during each call to `compute` based on this one.
*
* This is not defined if the DStream is created outside of one of the public DStream operations.
*/
private val scope: Option[RDDOperationScope] = {
Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
private val baseScope: Option[String] = {
Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
}

/**
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
*
* Each DStream produces many scopes and each scope may be shared by other DStreams created
* in the same operation. Separate calls to the same DStream operation create separate scopes.
* For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
*/
protected def makeScope(time: Time): Option[RDDOperationScope] = {
baseScope.map { bsJson =>
val formattedBatchTime = UIUtils.formatBatchTime(time.milliseconds)
val bscope = RDDOperationScope.fromJson(bsJson)
val baseName = bscope.name // e.g. countByWindow
val scopeName =
if (baseName.length > 10) {
// If the operation name is too long, wrap the line
s"$baseName\n@ $formattedBatchTime"
} else {
s"$baseName @ $formattedBatchTime"
}
val scopeId = s"${bscope.id}_${time.milliseconds}"
new RDDOperationScope(scopeName, scopeId)
}
}

/** Persist the RDDs of this DStream with the given storage level */
Expand Down Expand Up @@ -343,23 +369,23 @@ abstract class DStream[T: ClassTag] (
private def doCompute(time: Time): Option[RDD[T]] = {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
// Set the thread-local property for call sites to this DStream's creation site
// such that RDDs generated by compute gets that as their creation site.
// Note that this `doCompute` may get called from another DStream which may have
// set its own call site. So we store its call site in a temporary variable,
// set this DStream's creation site, generate RDDs and then restore the previous call site.
// Pass this DStream's operation scope and creation site information to RDDs through
// thread-local properties in our SparkContext. Since this method may be called from another
// DStream, we need to temporarily store any old scope and creation site information to
// restore them later after setting our own.
// TODO: this won't work if multiple StreamingContexts share the same SparkContext
val prevCallSite = ssc.sparkContext.getCallSite()
val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)

try {
ssc.sparkContext.setCallSite(creationSite)
// Use the DStream's scope for this RDD so we preserve the name of the operation that
// created the DStream. Note that this is equivalent to {{RDDOperationScope.ssc.withScope}}
// with `allowNesting = false` and `ignoreParent = true`.
// Use the DStream's base scope for this RDD so we can (1) preserve the higher level
// DStream operation name, and (2) share this scope with other DStreams created in the
// same operation. Disallow nesting so that low-level Spark primitives do not show up.
// TODO: merge callsites with scopes so we can just reuse the code there
scope.foreach { s =>
ssc.sparkContext.setLocalProperty(scopeKey, new RDDOperationScope(s.name).toJson)
makeScope(time).foreach { s =>
ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
* @param times all time values that will be used in the graphs.
*/
private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
val dateFormat = new SimpleDateFormat("HH:mm:ss")
val js = "var timeFormat = {};\n" + times.map { time =>
val formattedTime = dateFormat.format(new Date(time))
s"timeFormat[$time] = '$formattedTime';"
s"timeFormat[$time] = '${UIUtils.formatBatchTime(time)}';"
}.mkString("\n")

<script>{Unparsed(js)}</script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@

package org.apache.spark.streaming.ui

import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.Date

private[streaming] object UIUtils {

private val batchDateFormat = new SimpleDateFormat("HH:mm:ss")

/** Format the given batch time in a human readable format. */
def formatBatchTime(time: Long): String = batchDateFormat.format(new Date(time))

/**
* Return the short string for a `TimeUnit`.
*/
Expand Down

0 comments on commit 7c4513d

Please sign in to comment.