Skip to content

Commit

Permalink
Added graceful shutdown to Spark Streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 27, 2014
1 parent 345825d commit c43b8ae
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,19 @@ class CheckpointWriter(
}
}

def stop() {
synchronized {
if (stopped) {
return
}
stopped = true
}
def stop(): Unit = synchronized {
if (stopped) return

executor.shutdown()
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
if (!terminated) {
executor.shutdownNow()
}
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated +
", waited for " + (endTime - startTime) + " ms.")
stopped = true
}

private def fs = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object ContextState extends Enumeration {
type CheckpointState = Value
val Initialized, Started, Stopped = Value
}

import ContextState._
private[streaming] var state = Initialized

/**
* Return the associated Spark context
*/
Expand Down Expand Up @@ -405,9 +414,18 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
*/
def start() = synchronized {
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
scheduler.start()
state = Started
}

/**
Expand All @@ -430,12 +448,27 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
scheduler.stop()
def stop(
stopSparkContext: Boolean = true,
stopGracefully: Boolean = false
): Unit = synchronized {
// Silently warn if context is stopped twice, or context is stopped before starting
if (state == Initialized) {
logWarning("StreamingContext has not been started yet")
return
}
if (state == Stopped) {
logWarning("StreamingContext has already been stopped")
return
} // no need to throw an exception as its okay to stop twice
scheduler.stop(stopGracefully)
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
if (stopSparkContext) sc.stop()
state = Stopped
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,16 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)

/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
}

/**
Expand Down
Loading

0 comments on commit c43b8ae

Please sign in to comment.