Skip to content

Commit

Permalink
[STREAMING][MINOR] More contextual information in logs + minor code i…
Browse files Browse the repository at this point in the history
…mprovements

Please review and merge at your convenience. Thanks!

Author: Jacek Laskowski <[email protected]>

Closes apache#10595 from jaceklaskowski/streaming-minor-fixes.
  • Loading branch information
jaceklaskowski authored and srowen committed Jan 7, 2016
1 parent 07b314a commit 1b2c216
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ class DAGScheduler(
}

/**
* Check for waiting or failed stages which are now eligible for resubmission.
* Check for waiting stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
*/
private def submitWaitingStages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] class BlockResult(
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
*
* Note that #initialize() must be called before the BlockManager is usable.
* Note that [[initialize()]] must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.util.Utils
* This creates an iterator of (BlockID, InputStream) tuples so the caller can handle blocks
* in a pipelined fashion as they are received.
*
* The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
* The implementation throttles the remote fetches so they don't exceed maxBytesInFlight to avoid
* using too much memory.
*
* @param context [[TaskContext]], used for metrics update
Expand Down Expand Up @@ -329,7 +329,7 @@ final class ShuffleBlockFetcherIterator(
}

/**
* Helper class that ensures a ManagedBuffer is release upon InputStream.close()
* Helper class that ensures a ManagedBuffer is released upon InputStream.close()
*/
private class BufferReleasingInputStream(
private val delegate: InputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.nio.ByteBuffer;

/**
* Callback for streaming data. Stream data will be offered to the {@link onData(String, ByteBuffer)}
* method as it arrives. Once all the stream data is received, {@link onComplete(String)} will be
* Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
* method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
* called.
* <p>
* The network library guarantees that a single thread will call these methods at a time, but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void send(ByteBuffer message) {
/**
* Removes any state associated with the given RPC.
*
* @param requestId The RPC id returned by {@link #sendRpc(byte[], RpcResponseCallback)}.
* @param requestId The RPC id returned by {@link #sendRpc(ByteBuffer, RpcResponseCallback)}.
*/
public void removeRpcRequest(long requestId) {
handler.removeRpcRequest(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract void receive(

/**
* Receives an RPC message that does not expect a reply. The default implementation will
* call "{@link receive(TransportClient, byte[], RpcResponseCallback)}" and log a warning if
* call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
* any of the callback methods are called.
*
* @param client A channel client which enables the handler to make requests back to the sender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class StreamingContext private[streaming] (
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS for
* Note that this must be a fault-tolerant file system like HDFS.
*/
def checkpoint(directory: String) {
if (directory != null) {
Expand Down Expand Up @@ -274,7 +274,7 @@ class StreamingContext private[streaming] (
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*
* @deprecated As of 1.0.0", replaced by `receiverStream`.
* @deprecated As of 1.0.0 replaced by `receiverStream`.
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
Expand All @@ -285,7 +285,7 @@ class StreamingContext private[streaming] (

/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
Expand Down Expand Up @@ -549,7 +549,7 @@ class StreamingContext private[streaming] (

// Verify whether the DStream checkpoint is serializable
if (isCheckpointingEnabled) {
val checkpoint = new Checkpoint(this, Time.apply(0))
val checkpoint = new Checkpoint(this, Time(0))
try {
Checkpoint.serialize(checkpoint, conf)
} catch {
Expand All @@ -575,9 +575,9 @@ class StreamingContext private[streaming] (
*
* Return the current state of the context. The context can be in three possible states -
*
* - StreamingContextState.INTIALIZED - The context has been created, but not been started yet.
* - StreamingContextState.INITIALIZED - The context has been created, but not started yet.
* Input DStreams, transformations and output operations can be created on the context.
* - StreamingContextState.ACTIVE - The context has been started, and been not stopped.
* - StreamingContextState.ACTIVE - The context has been started, and not stopped.
* Input DStreams, transformations and output operations cannot be created on the context.
* - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ abstract class DStream[T: ClassTag] (
// Reference to whole DStream graph
private[streaming] var graph: DStreamGraph = null

private[streaming] def isInitialized = (zeroTime != null)
private[streaming] def isInitialized = zeroTime != null

// Duration for which the DStream requires its parent DStream to remember each RDD created
private[streaming] def parentRememberDuration = rememberDuration
Expand Down Expand Up @@ -189,15 +189,15 @@ abstract class DStream[T: ClassTag] (
*/
private[streaming] def initialize(time: Time) {
if (zeroTime != null && zeroTime != time) {
throw new SparkException("ZeroTime is already initialized to " + zeroTime
+ ", cannot initialize it again to " + time)
throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
+ s", cannot initialize it again to $time")
}
zeroTime = time

// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
}

// Set the minimum value of the rememberDuration if not already set
Expand Down Expand Up @@ -234,7 +234,7 @@ abstract class DStream[T: ClassTag] (

require(
!mustCheckpoint || checkpointDuration != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." +
s"The checkpoint interval for ${this.getClass.getSimpleName} has not been set." +
" Please use DStream.checkpoint() to set the interval."
)

Expand All @@ -245,53 +245,53 @@ abstract class DStream[T: ClassTag] (

require(
checkpointDuration == null || checkpointDuration >= slideDuration,
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " +
"Please set it to at least " + slideDuration + "."
s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
s"$checkpointDuration which is lower than its slide time ($slideDuration). " +
s"Please set it to at least $slideDuration."
)

require(
checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
"The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " +
checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " +
"Please set it to a multiple of " + slideDuration + "."
s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
s" $checkpointDuration which not a multiple of its slide time ($slideDuration). " +
s"Please set it to a multiple of $slideDuration."
)

require(
checkpointDuration == null || storageLevel != StorageLevel.NONE,
"" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " +
s"${this.getClass.getSimpleName} has been marked for checkpointing but the storage " +
"level has not been set to enable persisting. Please use DStream.persist() to set the " +
"storage level to use memory for better checkpointing performance."
)

require(
checkpointDuration == null || rememberDuration > checkpointDuration,
"The remember duration for " + this.getClass.getSimpleName + " has been set to " +
rememberDuration + " which is not more than the checkpoint interval (" +
checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
s" $rememberDuration which is not more than the checkpoint interval" +
s" ($checkpointDuration). Please set it to higher than $checkpointDuration."
)

dependencies.foreach(_.validateAtStart())

logInfo("Slide time = " + slideDuration)
logInfo("Storage level = " + storageLevel)
logInfo("Checkpoint interval = " + checkpointDuration)
logInfo("Remember duration = " + rememberDuration)
logInfo("Initialized and validated " + this)
logInfo(s"Slide time = $slideDuration")
logInfo(s"Storage level = ${storageLevel.description}")
logInfo(s"Checkpoint interval = $checkpointDuration")
logInfo(s"Remember duration = $rememberDuration")
logInfo(s"Initialized and validated $this")
}

private[streaming] def setContext(s: StreamingContext) {
if (ssc != null && ssc != s) {
throw new SparkException("Context is already set in " + this + ", cannot set it again")
throw new SparkException(s"Context must not be set again for $this")
}
ssc = s
logInfo("Set context for " + this)
logInfo(s"Set context for $this")
dependencies.foreach(_.setContext(ssc))
}

private[streaming] def setGraph(g: DStreamGraph) {
if (graph != null && graph != g) {
throw new SparkException("Graph is already set in " + this + ", cannot set it again")
throw new SparkException(s"Graph must not be set again for $this")
}
graph = g
dependencies.foreach(_.setGraph(graph))
Expand All @@ -300,7 +300,7 @@ abstract class DStream[T: ClassTag] (
private[streaming] def remember(duration: Duration) {
if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
rememberDuration = duration
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
logInfo(s"Duration for remembering RDDs set to $rememberDuration for $this")
}
dependencies.foreach(_.remember(parentRememberDuration))
}
Expand All @@ -310,11 +310,11 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) {
throw new SparkException (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
s" , slideDuration is $slideDuration and difference is ${time - zeroTime}")
false
} else {
logDebug("Time " + time + " is valid")
logDebug(s"Time $time is valid")
true
}
}
Expand Down Expand Up @@ -452,20 +452,20 @@ abstract class DStream[T: ClassTag] (
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo("Removing blocks of RDD " + b + " of time " + time)
logInfo(s"Removing blocks of RDD $b of time $time")
b.removeBlocks()
case _ =>
}
}
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
dependencies.foreach(_.clearMetadata(time))
}

Expand All @@ -477,10 +477,10 @@ abstract class DStream[T: ClassTag] (
* this method to save custom checkpoint data.
*/
private[streaming] def updateCheckpointData(currentTime: Time) {
logDebug("Updating checkpoint data for time " + currentTime)
logDebug(s"Updating checkpoint data for time $currentTime")
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
}

private[streaming] def clearCheckpointData(time: Time) {
Expand Down Expand Up @@ -509,13 +509,13 @@ abstract class DStream[T: ClassTag] (

@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".writeObject used")
logDebug(s"${this.getClass().getSimpleName}.writeObject used")
if (graph != null) {
graph.synchronized {
if (graph.checkpointInProgress) {
oos.defaultWriteObject()
} else {
val msg = "Object of " + this.getClass.getName + " is being serialized " +
val msg = s"Object of ${this.getClass.getName} is being serialized " +
" possibly as a part of closure of an RDD operation. This is because " +
" the DStream object is being referred to from within the closure. " +
" Please rewrite the RDD operation inside this DStream to avoid this. " +
Expand All @@ -532,7 +532,7 @@ abstract class DStream[T: ClassTag] (

@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
logDebug(s"${this.getClass().getSimpleName}.readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[T]] ()
}
Expand Down Expand Up @@ -756,7 +756,7 @@ abstract class DStream[T: ClassTag] (
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
Expand Down Expand Up @@ -903,21 +903,19 @@ abstract class DStream[T: ClassTag] (
val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
toTime
} else {
logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
toTime.floor(slideDuration, zeroTime)
logWarning(s"toTime ($toTime) is not a multiple of slideDuration ($slideDuration)")
toTime.floor(slideDuration, zeroTime)
}

val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
fromTime
} else {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
logWarning(s"fromTime ($fromTime) is not a multiple of slideDuration ($slideDuration)")
fromTime.floor(slideDuration, zeroTime)
}

logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
logInfo(s"Slicing from $fromTime to $toTime" +
s" (aligned to $alignedFromTime and $alignedToTime)")

alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
if (time >= zeroTime) getOrCompute(time) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.util.Utils

/**
* This is the abstract base class for all input streams. This class provides methods
* start() and stop() which is called by Spark Streaming system to start and stop receiving data.
* start() and stop() which are called by Spark Streaming system to start and stop
* receiving data, respectively.
* Input streams that can generate RDDs from new data by running a service/thread only on
* the driver node (that is, without running a receiver on worker nodes), can be
* implemented by directly inheriting this InputDStream. For example,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
* that has to start a receiver on worker nodes to receive external data.
* Specific implementations of ReceiverInputDStream must
* define `the getReceiver()` function that gets the receiver object of type
* define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
* @param ssc_ Streaming context that will execute this input stream
Expand Down Expand Up @@ -121,7 +121,7 @@ abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
}
if (validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
"To prevent such data loss, enabled Write Ahead Log (see programming guide " +
"To prevent such data loss, enable Write Ahead Log (see programming guide " +
"for more details.")
}
new BlockRDD[T](ssc.sc, validBlockIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[streaming] class BlockManagerBasedBlockHandler(

def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

var numRecords = None: Option[Long]
var numRecords: Option[Long] = None

val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable

/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method.
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
def onStop()

Expand Down Expand Up @@ -273,7 +273,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
/** Get the attached supervisor. */
private[streaming] def supervisor: ReceiverSupervisor = {
assert(_supervisor != null,
"A ReceiverSupervisor have not been attached to the receiver yet. Maybe you are starting " +
"A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
"some computation in the receiver before the Receiver.onStart() has been called.")
_supervisor
}
Expand Down
Loading

0 comments on commit 1b2c216

Please sign in to comment.