Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop #5004

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start()
listenerBus.start(this)
}

/** Post the application start event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
override def run() = Utils.logUncaughtExceptions {
override def run() = Utils.tryOrExit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the FsHistoryProvider is, what should the failure semantics be around it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FsHistoryProvider is the module used by history server process to render the application logs....

this getRunner is called for periodically check updated logs and clean logs...

the point here is if anything uncaught is thrown, like OOM, HistoryServer process should be restarted, since it has been unfunctional...

operateFun()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this one as well!

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean

import com.google.common.annotations.VisibleForTesting
import org.apache.spark.SparkContext

/**
* Asynchronously passes events to registered listeners.
Expand All @@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri

self =>

private var sparkContext: SparkContext = null

/* Cap the capacity of the event queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
Expand All @@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri

private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.logUncaughtExceptions {
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
while (true) {
eventLock.acquire()
self.synchronized {
Expand Down Expand Up @@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start() {
def start(sc: SparkContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an @param sc Used to stop the SparkContext in case the listener thread dies., or something to that effect

if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,8 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
try {
Expand All @@ -1155,6 +1157,32 @@ private[spark] object Utils extends Logging {
}
}

/**
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
try {
block
} catch {
case e: ControlThrowable => throw e
case t: Throwable =>
val currentThreadName = Thread.currentThread().getName
if (sc != null) {
logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
sc.stop()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about throwing t again here? So that the user can use UncaughtExceptionHandler to monitor the uncaught exception. If not, the user won't be aware that sc is shutdown until calling runJob next time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @zsxwing thanks for the comments

I personally prefer a more conservative way here (the current approach)

Because the throwable thrown from here can be varying in terms of types, and I'm concerning that the Throwable from here, like OOM, would be mixed with the instances of the same type generated by the other components in user's program; on the other hand, our goal is just to let the user know SparkContext is stopped

So I prefer to letting the user call SparkContext.runJob to get a IllegalStateException("SparkContext has been shutdown") which (hopefully) will be handled exactly

@srowen @aarondav , your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we catch NonFatal(e) and re-throw other Throwables? Basically saying that fatal errors should be re-thrown, but lesser ones can just stop here, they should only application-level exceptions which are our code's concern.

if (!NonFatal(t)) {
logError(s"throw uncaught fatal error in thread $currentThreadName", t)
throw t
}
}
}

/**
* Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import scala.io.Source

import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite}

import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io._
import org.apache.spark.util.{JsonProtocol, Utils}
Expand All @@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* logging events, whether the parsing of the file names is correct, and whether the logged events
* can be read and deserialized into actual SparkListenerEvents.
*/
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging {
class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
with Logging {
import EventLoggingListenerSuite._

private val fileSystem = Utils.getHadoopFileSystem("/",
Expand Down Expand Up @@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin

// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
listenerBus.start()
listenerBus.start(sc)
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start()
bus.start(sc)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(counter.count === 5)

Expand All @@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.start()
bus.start()
bus.start(sc)
bus.start(sc)
}

// ... or stopped before starting
Expand Down Expand Up @@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start()
bus.start(sc)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))

listenerStarted.acquire()
Expand Down Expand Up @@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
bus.start()
bus.start(sc)

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}), "JobScheduler")

listenerBus.start()
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
Expand Down