Skip to content

Commit

Permalink
Refactor DStream scope names again
Browse files Browse the repository at this point in the history
Now it reads from the class name instead of us having to specify
it wherever we instantiate it. This reduces some more duplicate
code.
  • Loading branch information
Andrew Or committed May 16, 2015
1 parent af4ba8d commit 1881802
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume stream") {
): ReceiverInputDStream[SparkFlumeEvent] = {
new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
}

Expand Down Expand Up @@ -159,7 +159,7 @@ object FlumeUtils {
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withNamedScope("flume polling stream") {
): ReceiverInputDStream[SparkFlumeEvent] = {
new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka direct stream [$id]"

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = ssc.withNamedScope("kafka stream") {
): ReceiverInputDStream[(K, V)] = {
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
Expand Down Expand Up @@ -348,7 +348,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = ssc.withNamedScope("kafka direct stream") {
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
Expand Down Expand Up @@ -394,7 +394,7 @@ object KafkaUtils {
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = ssc.withNamedScope("kafka direct stream") {
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
Expand All @@ -57,6 +56,8 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

private[streaming] override def name: String = s"MQTT stream [$id]"

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = ssc.withNamedScope("mqtt stream") {
): ReceiverInputDStream[String] = {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = ssc.withNamedScope("twitter stream") {
): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class StreamingContext private[streaming] (
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = withNamedScope("socket stream") {
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

Expand Down Expand Up @@ -372,7 +372,7 @@ class StreamingContext private[streaming] (
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = withNamedScope("file stream") {
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}

Expand All @@ -393,9 +393,7 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
withNamedScope("file stream") {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

/**
Expand All @@ -418,7 +416,7 @@ class StreamingContext private[streaming] (
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = withNamedScope("file stream") {
conf: Configuration): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
}

Expand Down Expand Up @@ -475,7 +473,7 @@ class StreamingContext private[streaming] (
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): InputDStream[T] = withNamedScope("queue stream") {
): InputDStream[T] = {
queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
}

Expand All @@ -492,7 +490,7 @@ class StreamingContext private[streaming] (
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
): InputDStream[T] = withNamedScope("queue stream") {
): InputDStream[T] = {
new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,10 @@ abstract class DStream[T: ClassTag] (
*
* This is not defined if the DStream is created outside of one of the public DStream operations.
*/
private[streaming] val baseScope: Option[String] = {
protected[streaming] val baseScope: Option[String] = {
Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
}

/**
* Make a scope name based on the given one.
* Subclasses may optionally override this to provide custom scope names.
*/
protected[streaming] def makeScopeName(baseName: String): String = baseName

/**
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
*
Expand All @@ -142,16 +136,16 @@ abstract class DStream[T: ClassTag] (
baseScope.map { bsJson =>
val formattedBatchTime =
UIUtils.formatBatchTime(time.milliseconds, ssc.graph.batchDuration.milliseconds)
val bscope = RDDOperationScope.fromJson(bsJson)
val baseName = makeScopeName(bscope.name) // e.g. countByWindow, "kafka stream [0]"
val bs = RDDOperationScope.fromJson(bsJson)
val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
val scopeName =
if (baseName.length > 10) {
// If the operation name is too long, wrap the line
s"$baseName\n@ $formattedBatchTime"
} else {
s"$baseName @ $formattedBatchTime"
}
val scopeId = s"${bscope.id}_${time.milliseconds}"
val scopeId = s"${bs.id}_${time.milliseconds}"
new RDDOperationScope(scopeName, id = scopeId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark.streaming.dstream

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import org.apache.spark.util.Utils

/**
* This is the abstract base class for all input streams. This class provides methods
Expand All @@ -44,13 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
// e.g. FlumePollingDStream -> "Flume polling stream"
val newName = Utils.getFormattedClassName(this)
.replaceAll("InputDStream", "Stream")
.split("(?=[A-Z])")
.filter(_.nonEmpty)
.mkString(" ")
.toLowerCase
.capitalize
s"$newName [$id]"
}

/**
* The name of this InputDStream. By default, it's the class name with its id.
* The base scope associated with the operation that created this DStream.
*
* For InputDStreams, we use the name of this DStream as the scope name.
* If an outer scope is given, we assume that it includes an alternative name for this stream.
*/
private[streaming] def name: String = s"${getClass.getSimpleName}-$id"

/** Make a scope name based on the given one. This includes the ID of this stream. */
protected[streaming] override def makeScopeName(baseName: String): String = s"$baseName [$id]"
protected[streaming] override val baseScope: Option[String] = {
val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
.map { json => RDDOperationScope.fromJson(json).name + s" [$id]" }
.getOrElse(name.toLowerCase)
Some(new RDDOperationScope(scopeName).toJson)
}

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
Expand Down
Loading

0 comments on commit 1881802

Please sign in to comment.