Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7501] [Streaming] DAG visualization: show DStream operations #6034

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a0d3263
Scope streaming operations instead of RDD operations
May 9, 2015
65ef3e9
Fix NPE
May 9, 2015
c121047
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 13, 2015
05c2676
Wrap many more methods in withScope
May 13, 2015
bf0ab6e
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 14, 2015
7c4513d
Group RDDs by DStream operations and batches
May 14, 2015
5703939
Rename operations that create InputDStreams
May 14, 2015
5c30360
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 14, 2015
bb80bbb
Fix MIMA?
May 14, 2015
b3806ab
Fix test
May 14, 2015
9113183
Add tests for DStream scopes
May 14, 2015
25416dc
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 15, 2015
e4a93ac
Fix tests?
May 15, 2015
d25a324
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 15, 2015
074c00b
Review comments
May 15, 2015
1af0b0e
Fix style
May 15, 2015
fa4e5fb
Pass in input stream name rather than defining it from within
May 15, 2015
0ca1801
Remove a few unnecessary withScopes on aliases
May 15, 2015
f6de871
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 15, 2015
fd07d22
Make MQTT lower case
May 15, 2015
af4ba8d
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 16, 2015
1881802
Refactor DStream scope names again
May 16, 2015
53b9936
Set scopes for foreachRDD properly
May 16, 2015
697c086
Fix tests
May 16, 2015
84d0656
Review feedback
May 17, 2015
e685df9
Rename createRDDWith
May 18, 2015
932a64a
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
May 18, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand Down
24 changes: 17 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.SparkContext
import org.apache.spark.{Logging, SparkContext}

/**
* A general, named code block representing an operation that instantiates RDDs.
Expand All @@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
val parent: Option[RDDOperationScope] = None) {

val id: Int = RDDOperationScope.nextScopeId()
val parent: Option[RDDOperationScope] = None,
val id: String = RDDOperationScope.nextScopeId().toString) {

def toJson: String = {
RDDOperationScope.jsonMapper.writeValueAsString(this)
Expand Down Expand Up @@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object RDDOperationScope {
private[spark] object RDDOperationScope extends Logging {
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val scopeCounter = new AtomicInteger(0)

Expand All @@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the name of the method that immediately encloses this one.
* The name of the scope will be the first method name in the stack trace that is not the
* same as this method's.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by this method's? The name is not withScope?

*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
.find(_.getMethodName != ourMethodName)
.map(_.getMethodName)
.getOrElse {
// Log a warning just in case, but this should almost certainly never happen
logWarning("No valid method name for this RDD operation scope!")
"N/A"
}
withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ private[ui] object RDDOperationGraph extends Logging {
// which may be nested inside of other clusters
val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
val rddClusters = rddScopes.map { scope =>
val clusterId = scope.name + "_" + scope.id
val clusterName = scope.name
val clusterId = scope.id
val clusterName = scope.name.replaceAll("\\n", "\\\\n")
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
}
// Build the cluster hierarchy for this RDD
Expand Down Expand Up @@ -177,7 +177,7 @@ private[ui] object RDDOperationGraph extends Logging {

/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
s"""${node.id} [label="${node.name} (${node.id})"]"""
s"""${node.id} [label="${node.name} [${node.id}]"]"""
}

/** Return the dot representation of a subgraph in an RDDOperationGraph. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{TaskContext, Partition, SparkContext}

/**
*
* Tests whether scopes are passed from the RDD operation to the RDDs correctly.
*/
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
private var sc: SparkContext = null
private val scope1 = new RDDOperationScope("scope1")
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
private val scope3 = new RDDOperationScope("scope3", Some(scope2))

before {
sc = new SparkContext("local", "test")
Expand All @@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
val scope1Json = scope1.toJson
val scope2Json = scope2.toJson
val scope3Json = scope3.toJson
assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
assert(RDDOperationScope.fromJson(scope1Json) === scope1)
assert(RDDOperationScope.fromJson(scope2Json) === scope2)
assert(RDDOperationScope.fromJson(scope3Json) === scope3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka direct stream [$id]"

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
): RDD[(K, V)] = sc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
Expand Down Expand Up @@ -224,7 +224,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
): RDD[R] = sc.withScope {
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
Expand All @@ -233,7 +233,8 @@ object KafkaUtils {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
val cleanedHandler = sc.clean(messageHandler)
new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
}

/**
Expand All @@ -256,7 +257,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange]
): JavaPairRDD[K, V] = {
): JavaPairRDD[K, V] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -294,7 +295,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
): JavaRDD[R] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -348,8 +349,9 @@ object KafkaUtils {
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, messageHandler)
ssc, kafkaParams, fromOffsets, cleanedHandler)
}

/**
Expand Down Expand Up @@ -469,11 +471,12 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
messageHandler.call _
cleanedHandler
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
Expand All @@ -57,6 +56,8 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

private[streaming] override def name: String = s"MQTT stream [$id]"

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -241,25 +241,45 @@ class StreamingContext private[streaming] (

private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()

/**
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)

/**
* Execute a block of code in a scope such that all new DStreams created in this body will
* be part of the same scope. For more detail, see the comments in `doCompute`.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
receiverStream(receiver)
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("network stream") {
receiverStream(receiver)
}
}

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
new PluggableInputDStream[T](this, receiver)
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("receiver stream") {
new PluggableInputDStream[T](this, receiver)
}
}

/**
Expand All @@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = {
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}

Expand All @@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

Expand Down Expand Up @@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T] = {
): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
new RawInputDStream[T](this, hostname, port, storageLevel)
}

Expand Down Expand Up @@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

Expand All @@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = {
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
Expand Down Expand Up @@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
new UnionDStream[T](streams.toArray)
}

Expand All @@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}

Expand Down
Loading