Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 15, 2015
1 parent d25a324 commit 074c00b
Show file tree
Hide file tree
Showing 17 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.spark.{Logging, SparkContext}
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
val name: String,
val id: String = RDDOperationScope.nextScopeId().toString,
val parent: Option[RDDOperationScope] = None) {
val parent: Option[RDDOperationScope] = None,
val id: String = RDDOperationScope.nextScopeId().toString) {

def toJson: String = {
RDDOperationScope.jsonMapper.writeValueAsString(this)
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] object RDDOperationScope extends Logging {
sc: SparkContext,
allowNesting: Boolean = false)(body: => T): T = {
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
val ourMethodName = stackTrace(1).getMethodName
val ourMethodName = stackTrace(1).getMethodName //
// Climb upwards to find the first method that's called something different
val callerMethodName = stackTrace
.find(_.getMethodName != ourMethodName)
Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] object RDDOperationScope extends Logging {
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, parent = oldScope).toJson)
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.{TaskContext, Partition, SparkContext}
class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
private var sc: SparkContext = null
private val scope1 = new RDDOperationScope("scope1")
private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
private val scope2 = new RDDOperationScope("scope2", Some(scope1))
private val scope3 = new RDDOperationScope("scope3", Some(scope2))

before {
sc = new SparkContext("local", "test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FlumeInputDStream[T: ClassTag](
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {

protected override val customScopeName: Option[String] = Some(s"input stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]")

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[streaming] class FlumePollingInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {

protected override val customScopeName: Option[String] = Some(s"flume polling stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"flume polling stream [$id]")

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

protected override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"kafka direct stream [$id]")

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class KafkaInputDStream[
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

protected override val customScopeName: Option[String] = Some(s"kafka stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"kafka stream [$id]")

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {

protected override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"MQTT stream [$id]")

def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc_) {

protected override val customScopeName: Option[String] = Some(s"twitter stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"twitter stream [$id]")

private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.reflect.ClassTag
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"constant stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"constant stream [$id]")

override def start() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ abstract class DStream[T: ClassTag] (
* An optional custom name for all scopes generated by this DStream.
* If None, the name of the operation that created this DStream will be used.
*/
protected val customScopeName: Option[String] = None
protected[streaming] val customScopeName: Option[String] = None

/**
* Make a scope that groups RDDs created in the same DStream operation in the same batch.
Expand All @@ -152,7 +152,7 @@ abstract class DStream[T: ClassTag] (
s"$baseName @ $formattedBatchTime"
}
val scopeId = s"${bscope.id}_${time.milliseconds}"
new RDDOperationScope(scopeName, scopeId)
new RDDOperationScope(scopeName, id = scopeId)
}
}

Expand Down Expand Up @@ -347,22 +347,22 @@ abstract class DStream[T: ClassTag] (
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val newRDD = doCompute(time)
val rddOption = doCompute(time)

// Register the generated RDD for caching and checkpointing
newRDD.foreach { case rdd =>
rddOption.foreach { case newRDD =>
if (storageLevel != StorageLevel.NONE) {
rdd.persist(storageLevel)
logDebug(s"Persisting RDD ${rdd.id} for time $time to $storageLevel")
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
rdd.checkpoint()
logInfo(s"Marking RDD ${rdd.id} for time $time for checkpointing")
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, rdd)
generatedRDDs.put(time, newRDD)
}

newRDD
rddOption
} else {
None
}
Expand All @@ -380,7 +380,6 @@ abstract class DStream[T: ClassTag] (
// thread-local properties in our SparkContext. Since this method may be called from another
// DStream, we need to temporarily store any old scope and creation site information to
// restore them later after setting our own.
// TODO: this won't work if multiple StreamingContexts share the same SparkContext
val prevCallSite = ssc.sparkContext.getCallSite()
val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null

protected override val customScopeName: Option[String] = Some(s"file stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"file stream [$id]")

override def start() { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
private[streaming] def name: String = s"${getClass.getSimpleName}-$id"

/** Human-friendly scope name to use in place of generic operation names (e.g. createStream). */
protected override val customScopeName: Option[String] = Some(s"input stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"input stream [$id]")

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class QueueInputDStream[T: ClassTag](
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {

protected override val customScopeName: Option[String] = Some(s"queue stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"queue stream [$id]")

override def start() { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class RawInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_ ) with Logging {

protected override val customScopeName: Option[String] = Some(s"raw stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"raw stream [$id]")

def getReceiver(): Receiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"receiver stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"receiver stream [$id]")

/**
* Gets the receiver object that will be sent to the worker nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SocketInputDStream[T: ClassTag](
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {

protected override val customScopeName: Option[String] = Some(s"socket stream [$id]")
protected[streaming] override val customScopeName: Option[String] = Some(s"socket stream [$id]")

def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private class DummyInputDStream(
customName: Option[String] = None)
extends InputDStream[Int](ssc) {

protected override val customScopeName: Option[String] = customName
protected[streaming] override val customScopeName: Option[String] = customName
override def start(): Unit = { }
override def stop(): Unit = { }
override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
Expand Down

0 comments on commit 074c00b

Please sign in to comment.