diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index 4e1286213f85e..369702977650d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -43,8 +43,8 @@ import org.apache.spark.{Logging, SparkContext} @JsonPropertyOrder(Array("id", "name", "parent")) private[spark] class RDDOperationScope( val name: String, - val id: String = RDDOperationScope.nextScopeId().toString, - val parent: Option[RDDOperationScope] = None) { + val parent: Option[RDDOperationScope] = None, + val id: String = RDDOperationScope.nextScopeId().toString) { def toJson: String = { RDDOperationScope.jsonMapper.writeValueAsString(this) @@ -96,7 +96,7 @@ private[spark] object RDDOperationScope extends Logging { sc: SparkContext, allowNesting: Boolean = false)(body: => T): T = { val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace" - val ourMethodName = stackTrace(1).getMethodName + val ourMethodName = stackTrace(1).getMethodName // // Climb upwards to find the first method that's called something different val callerMethodName = stackTrace .find(_.getMethodName != ourMethodName) @@ -139,7 +139,7 @@ private[spark] object RDDOperationScope extends Logging { sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson) } else if (sc.getLocalProperty(noOverrideKey) == null) { // Otherwise, set the scope only if the higher level caller allows us to do so - sc.setLocalProperty(scopeKey, new RDDOperationScope(name, parent = oldScope).toJson) + sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope if (!allowNesting) { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala index aec7b3219ee8d..4434ed858c60c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala @@ -27,8 +27,8 @@ import org.apache.spark.{TaskContext, Partition, SparkContext} class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter { private var sc: SparkContext = null private val scope1 = new RDDOperationScope("scope1") - private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1)) - private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2)) + private val scope2 = new RDDOperationScope("scope2", Some(scope1)) + private val scope3 = new RDDOperationScope("scope3", Some(scope2)) before { sc = new SparkContext("local", "test") diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 8b86bb4e7505d..7fec636d749f8 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -50,7 +50,7 @@ class FlumeInputDStream[T: ClassTag]( enableDecompression: Boolean ) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { - protected override val customScopeName: Option[String] = Some(s"input stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]") override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel, enableDecompression) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 59db0cdc9bb82..f12995a357275 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -53,7 +53,7 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - protected override val customScopeName: Option[String] = Some(s"flume polling stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"flume polling stream [$id]") override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 6b44eea998ec2..b386a237e2e4c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -65,7 +65,7 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) - protected override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]") protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 14f5a5f06d024..65fa789c5cc4c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -55,7 +55,7 @@ class KafkaInputDStream[ storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { - protected override val customScopeName: Option[String] = Some(s"kafka stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"kafka stream [$id]") def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index e26a16503859b..d47ff268271a9 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -57,7 +57,7 @@ class MQTTInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) { - protected override val customScopeName: Option[String] = Some(s"MQTT stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"MQTT stream [$id]") def getReceiver(): Receiver[String] = { new MQTTReceiver(brokerUrl, topic, storageLevel) diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 2c2d5296b32e2..9d742de802b00 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -45,7 +45,7 @@ class TwitterInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[Status](ssc_) { - protected override val customScopeName: Option[String] = Some(s"twitter stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"twitter stream [$id]") private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index 9a65efa8b425c..f659b9b1e608b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) extends InputDStream[T](ssc_) { - protected override val customScopeName: Option[String] = Some(s"constant stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"constant stream [$id]") override def start() {} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index c76e821ed9253..af195190130fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -129,7 +129,7 @@ abstract class DStream[T: ClassTag] ( * An optional custom name for all scopes generated by this DStream. * If None, the name of the operation that created this DStream will be used. */ - protected val customScopeName: Option[String] = None + protected[streaming] val customScopeName: Option[String] = None /** * Make a scope that groups RDDs created in the same DStream operation in the same batch. @@ -152,7 +152,7 @@ abstract class DStream[T: ClassTag] ( s"$baseName @ $formattedBatchTime" } val scopeId = s"${bscope.id}_${time.milliseconds}" - new RDDOperationScope(scopeName, scopeId) + new RDDOperationScope(scopeName, id = scopeId) } } @@ -347,22 +347,22 @@ abstract class DStream[T: ClassTag] ( // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { - val newRDD = doCompute(time) + val rddOption = doCompute(time) // Register the generated RDD for caching and checkpointing - newRDD.foreach { case rdd => + rddOption.foreach { case newRDD => if (storageLevel != StorageLevel.NONE) { - rdd.persist(storageLevel) - logDebug(s"Persisting RDD ${rdd.id} for time $time to $storageLevel") + newRDD.persist(storageLevel) + logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { - rdd.checkpoint() - logInfo(s"Marking RDD ${rdd.id} for time $time for checkpointing") + newRDD.checkpoint() + logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } - generatedRDDs.put(time, rdd) + generatedRDDs.put(time, newRDD) } - newRDD + rddOption } else { None } @@ -380,7 +380,6 @@ abstract class DStream[T: ClassTag] ( // thread-local properties in our SparkContext. Since this method may be called from another // DStream, we need to temporarily store any old scope and creation site information to // restore them later after setting our own. - // TODO: this won't work if multiple StreamingContexts share the same SparkContext val prevCallSite = ssc.sparkContext.getCallSite() val prevScope = ssc.sparkContext.getLocalProperty(scopeKey) val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 2123585ffd9d8..4bd08de76158c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -126,7 +126,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - protected override val customScopeName: Option[String] = Some(s"file stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"file stream [$id]") override def start() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 0fb0cc20b14f9..ed89a5b44268b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -50,7 +50,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) private[streaming] def name: String = s"${getClass.getSimpleName}-$id" /** Human-friendly scope name to use in place of generic operation names (e.g. createStream). */ - protected override val customScopeName: Option[String] = Some(s"input stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]") /** * Checks whether the 'time' is valid wrt slideDuration for generating RDD. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala index dccaacf889cdd..ae40c4b8f2fca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -32,7 +32,7 @@ class QueueInputDStream[T: ClassTag]( defaultRDD: RDD[T] ) extends InputDStream[T](ssc) { - protected override val customScopeName: Option[String] = Some(s"queue stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"queue stream [$id]") override def start() { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index b386144b0833a..cccfb8368a5d6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -45,7 +45,7 @@ class RawInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_ ) with Logging { - protected override val customScopeName: Option[String] = Some(s"raw stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"raw stream [$id]") def getReceiver(): Receiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index e3c1afeffee84..435c9623485cd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -40,7 +40,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - protected override val customScopeName: Option[String] = Some(s"receiver stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"receiver stream [$id]") /** * Gets the receiver object that will be sent to the worker nodes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 5f789f9f027a9..4d79a3f116534 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -37,7 +37,7 @@ class SocketInputDStream[T: ClassTag]( storageLevel: StorageLevel ) extends ReceiverInputDStream[T](ssc_) { - protected override val customScopeName: Option[String] = Some(s"socket stream [$id]") + protected[streaming] override val customScopeName: Option[String] = Some(s"socket stream [$id]") def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala index 01b9fc91ead9d..c202254314973 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala @@ -189,7 +189,7 @@ private class DummyInputDStream( customName: Option[String] = None) extends InputDStream[Int](ssc) { - protected override val customScopeName: Option[String] = customName + protected[streaming] override val customScopeName: Option[String] = customName override def start(): Unit = { } override def stop(): Unit = { } override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])