Skip to content

Commit

Permalink
[SPARK-7501] [STREAMING] DAG visualization: show DStream operations
Browse files Browse the repository at this point in the history
This is similar to #5999, but for streaming. Roughly 200 lines are tests.

One thing to note here is that we already do some kind of scoping thing for call sites, so this patch adds the new RDD operation scoping logic in the same place. Also, this patch adds a `try finally` block to set the relevant variables in a safer way.

tdas zsxwing

------------------------
**Before**
<img src="https://cloud.githubusercontent.com/assets/2133137/7625996/d88211b8-f9b4-11e4-90b9-e11baa52d6d7.png" width="450px"/>

--------------------------
**After**
<img src="https://cloud.githubusercontent.com/assets/2133137/7625997/e0878f8c-f9b4-11e4-8df3-7dd611b13c87.png" width="650px"/>

Author: Andrew Or <[email protected]>

Closes #6034 from andrewor14/dag-viz-streaming and squashes the following commits:

932a64a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
e685df9 [Andrew Or] Rename createRDDWith
84d0656 [Andrew Or] Review feedback
697c086 [Andrew Or] Fix tests
53b9936 [Andrew Or] Set scopes for foreachRDD properly
1881802 [Andrew Or] Refactor DStream scope names again
af4ba8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
fd07d22 [Andrew Or] Make MQTT lower case
f6de871 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
0ca1801 [Andrew Or] Remove a few unnecessary withScopes on aliases
fa4e5fb [Andrew Or] Pass in input stream name rather than defining it from within
1af0b0e [Andrew Or] Fix style
074c00b [Andrew Or] Review comments
d25a324 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
e4a93ac [Andrew Or] Fix tests?
25416dc [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
9113183 [Andrew Or] Add tests for DStream scopes
b3806ab [Andrew Or] Fix test
bb80bbb [Andrew Or] Fix MIMA?
5c30360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
5703939 [Andrew Or] Rename operations that create InputDStreams
7c4513d [Andrew Or] Group RDDs by DStream operations and batches
bf0ab6e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
05c2676 [Andrew Or] Wrap many more methods in withScope
c121047 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
65ef3e9 [Andrew Or] Fix NPE
a0d3263 [Andrew Or] Scope streaming operations instead of RDD operations

(cherry picked from commit b93c97d)
Signed-off-by: Andrew Or <[email protected]>
  • Loading branch information
Andrew Or committed May 18, 2015
1 parent ba502ab commit a475cbc
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 145 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand Down
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.SparkContext
import org.apache.spark.{Logging, SparkContext}

/**
* A general, named code block representing an operation that instantiates RDDs.
Expand All @@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
val parent: Option[RDDOperationScope] = None) {

val id: Int = RDDOperationScope.nextScopeId()
val parent: Option[RDDOperationScope] = None,
val id: String = RDDOperationScope.nextScopeId().toString) {

def toJson: String = {
RDDOperationScope.jsonMapper.writeValueAsString(this)
Expand Down Expand Up @@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object RDDOperationScope {
private[spark] object RDDOperationScope extends Logging {
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val scopeCounter = new AtomicInteger(0)

Expand All @@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the name of the method that immediately encloses this one.
* The name of the scope will be the first method name in the stack trace that is not the
* same as this method's.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ private[ui] object RDDOperationGraph extends Logging {
// which may be nested inside of other clusters
val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
val rddClusters = rddScopes.map { scope =>
val clusterId = scope.name + "_" + scope.id
val clusterName = scope.name
val clusterId = scope.id
val clusterName = scope.name.replaceAll("\\n", "\\\\n")
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
}
// Build the cluster hierarchy for this RDD
Expand Down Expand Up @@ -177,7 +177,7 @@ private[ui] object RDDOperationGraph extends Logging {

/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
s"""${node.id} [label="${node.name} (${node.id})"]"""
s"""${node.id} [label="${node.name} [${node.id}]"]"""
}

/** Return the dot representation of a subgraph in an RDDOperationGraph. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{TaskContext, Partition, SparkContext}

/**
*
* Tests whether scopes are passed from the RDD operation to the RDDs correctly.
*/
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
private var sc: SparkContext = null
private val scope1 = new RDDOperationScope("scope1")
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
private val scope3 = new RDDOperationScope("scope3", Some(scope2))

before {
sc = new SparkContext("local", "test")
Expand All @@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
val scope1Json = scope1.toJson
val scope2Json = scope2.toJson
val scope3Json = scope3.toJson
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
assert(RDDOperationScope.fromJson(scope3Json) === scope3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka direct stream [$id]"

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
): RDD[(K, V)] = sc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
Expand Down Expand Up @@ -224,7 +224,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
): RDD[R] = sc.withScope {
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
Expand All @@ -233,7 +233,8 @@ object KafkaUtils {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
val cleanedHandler = sc.clean(messageHandler)
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
}

/**
Expand All @@ -256,7 +257,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange]
): JavaPairRDD[K, V] = {
): JavaPairRDD[K, V] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -294,7 +295,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
): JavaRDD[R] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -348,8 +349,9 @@ object KafkaUtils {
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, messageHandler)
ssc, kafkaParams, fromOffsets, cleanedHandler)
}

/**
Expand Down Expand Up @@ -469,11 +471,12 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
messageHandler.call _
cleanedHandler
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
Expand All @@ -57,6 +56,8 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

private[streaming] override def name: String = s"MQTT stream [$id]"

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -241,25 +241,45 @@ class StreamingContext private[streaming] (

private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()

/**
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)

/**
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
receiverStream(receiver)
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("network stream") {
receiverStream(receiver)
}
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
new PluggableInputDStream[T](this, receiver)
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("receiver stream") {
new PluggableInputDStream[T](this, receiver)
}
}

/**
Expand All @@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = {
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}

Expand All @@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

Expand Down Expand Up @@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T] = {
): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
new RawInputDStream[T](this, hostname, port, storageLevel)
}

Expand Down Expand Up @@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

Expand All @@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = {
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
Expand Down Expand Up @@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
new UnionDStream[T](streams.toArray)
}

Expand All @@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}

Expand Down
Loading

0 comments on commit a475cbc

Please sign in to comment.