Skip to content

Commit

Permalink
[SPARK-12001] Allow partially-stopped StreamingContext to be complete…
Browse files Browse the repository at this point in the history
…ly stopped

If `StreamingContext.stop()` is interrupted midway through the call, the context will be marked as stopped but certain state will have not been cleaned up. Because `state = STOPPED` will be set, subsequent `stop()` calls will be unable to finish stopping the context, preventing any new StreamingContexts from being created.

This patch addresses this issue by only marking the context as `STOPPED` once the `stop()` has successfully completed which allows `stop()` to be called a second time in order to finish stopping the context in case the original `stop()` call was interrupted.

I discovered this issue by examining logs from a failed Jenkins run in which this race condition occurred in `FailureSuite`, leaking an unstoppable context and causing all subsequent tests to fail.

Author: Josh Rosen <[email protected]>

Closes #9982 from JoshRosen/SPARK-12001.
  • Loading branch information
JoshRosen committed Dec 2, 2015
1 parent a1542ce commit 452690b
Showing 1 changed file with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,28 +699,33 @@ class StreamingContext private[streaming] (
" AsynchronousListenerBus")
}
synchronized {
try {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
}
logInfo("StreamingContext stopped successfully")
}
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
state = STOPPED
case STOPPED =>
logWarning("StreamingContext has already been stopped")
state = STOPPED
case ACTIVE =>
// It's important that we don't set state = STOPPED until the very end of this case,
// since we need to ensure that we're still able to call `stop()` to recover from
// a partially-stopped StreamingContext which resulted from this `stop()` call being
// interrupted. See SPARK-12001 for more details. Because the body of this case can be
// executed twice in the case of a partial stop, all methods called here need to be
// idempotent.
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
}
logInfo("StreamingContext stopped successfully")
state = STOPPED
}
}
if (shutdownHookRefToRemove != null) {
Expand Down

0 comments on commit 452690b

Please sign in to comment.