Skip to content

Commit

Permalink
Wrap many more methods in withScope
Browse files Browse the repository at this point in the history
This covers all the places where we instantiate DStreams.
  • Loading branch information
Andrew Or committed May 13, 2015
1 parent c121047 commit 05c2676
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 147 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
createStream(ssc, hostname, port, storageLevel, false)
}

Expand All @@ -59,11 +59,8 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
ssc, hostname, port, storageLevel, enableDecompression)

inputStream
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel, enableDecompression)
}

/**
Expand All @@ -76,7 +73,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createStream(jssc.ssc, hostname, port)
}

Expand All @@ -91,7 +88,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createStream(jssc.ssc, hostname, port, storageLevel, false)
}

Expand All @@ -108,7 +105,7 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}

Expand All @@ -125,7 +122,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
}

Expand All @@ -140,7 +137,7 @@ object FlumeUtils {
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
storageLevel: StorageLevel
): ReceiverInputDStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
createPollingStream(ssc, addresses, storageLevel,
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
}
Expand All @@ -162,7 +159,7 @@ object FlumeUtils {
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): ReceiverInputDStream[SparkFlumeEvent] = {
): ReceiverInputDStream[SparkFlumeEvent] = ssc.withScope {
new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
Expand All @@ -178,7 +175,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
}

Expand All @@ -195,7 +192,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
}

Expand All @@ -210,7 +207,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
addresses: Array[InetSocketAddress],
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createPollingStream(jssc, addresses, storageLevel,
DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
}
Expand All @@ -232,7 +229,7 @@ object FlumeUtils {
storageLevel: StorageLevel,
maxBatchSize: Int,
parallelism: Int
): JavaReceiverInputDStream[SparkFlumeEvent] = {
): JavaReceiverInputDStream[SparkFlumeEvent] = jssc.ssc.withScope {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object KafkaUtils {
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)] = {
): ReceiverInputDStream[(String, String)] = ssc.withScope {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
Expand All @@ -80,7 +80,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
): ReceiverInputDStream[(K, V)] = ssc.withScope {
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
Expand All @@ -99,7 +99,7 @@ object KafkaUtils {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

Expand All @@ -118,7 +118,7 @@ object KafkaUtils {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
): JavaPairReceiverInputDStream[String, String] = jssc.ssc.withScope {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
Expand All @@ -145,7 +145,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
): JavaPairReceiverInputDStream[K, V] = jssc.ssc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

Expand Down Expand Up @@ -189,7 +189,7 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
): RDD[(K, V)] = {
): RDD[(K, V)] = sc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
Expand Down Expand Up @@ -224,7 +224,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
): RDD[R] = {
): RDD[R] = sc.withScope {
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
Expand Down Expand Up @@ -256,7 +256,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange]
): JavaPairRDD[K, V] = {
): JavaPairRDD[K, V] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -294,7 +294,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaRDD[R] = {
): JavaRDD[R] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -347,7 +347,7 @@ object KafkaUtils {
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
): InputDStream[R] = ssc.withScope {
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, messageHandler)
}
Expand Down Expand Up @@ -392,7 +392,7 @@ object KafkaUtils {
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
): InputDStream[(K, V)] = ssc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
Expand Down Expand Up @@ -463,7 +463,7 @@ object KafkaUtils {
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
): JavaInputDStream[R] = jssc.ssc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down Expand Up @@ -521,7 +521,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
topics: JSet[String]
): JavaPairInputDStream[K, V] = {
): JavaPairInputDStream[K, V] = jssc.ssc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.reflect.ClassTag

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object MQTTUtils {
/**
Expand All @@ -37,7 +37,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
): ReceiverInputDStream[String] = ssc.withScope {
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
}

Expand All @@ -52,7 +52,7 @@ object MQTTUtils {
jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaReceiverInputDStream[String] = {
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic)
}
Expand All @@ -69,7 +69,7 @@ object MQTTUtils {
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
): JavaReceiverInputDStream[String] = {
): JavaReceiverInputDStream[String] = jssc.ssc.withScope {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object TwitterUtils {
/**
Expand All @@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = {
): ReceiverInputDStream[Status] = ssc.withScope {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}

Expand All @@ -53,7 +53,9 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
*/
def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
jssc.ssc.withScope {
createStream(jssc.ssc, None)
}
}

/**
Expand All @@ -66,7 +68,7 @@ object TwitterUtils {
* @param filters Set of filter strings to get only those tweets that match them
*/
def createStream(jssc: JavaStreamingContext, filters: Array[String]
): JavaReceiverInputDStream[Status] = {
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
createStream(jssc.ssc, None, filters)
}

Expand All @@ -83,7 +85,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
): JavaReceiverInputDStream[Status] = {
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
createStream(jssc.ssc, None, filters, storageLevel)
}

Expand All @@ -94,7 +96,7 @@ object TwitterUtils {
* @param twitterAuth Twitter4J Authorization
*/
def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
): JavaReceiverInputDStream[Status] = {
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
createStream(jssc.ssc, Some(twitterAuth))
}

Expand All @@ -109,7 +111,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
): JavaReceiverInputDStream[Status] = {
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
createStream(jssc.ssc, Some(twitterAuth), filters)
}

Expand All @@ -125,7 +127,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaReceiverInputDStream[Status] = {
): JavaReceiverInputDStream[Status] = jssc.ssc.withScope {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
Loading

0 comments on commit 05c2676

Please sign in to comment.