From 63229590d4645e0da8cb7e4217931a0443f7900d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 12 Mar 2015 13:39:39 -0400 Subject: [PATCH 1/5] exit JVM process when the exception is thrown from an infinite loop --- core/src/main/scala/org/apache/spark/ContextCleaner.scala | 2 +- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../scala/org/apache/spark/util/AsynchronousListenerBus.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 0c59a61e81393..329e6d275ea21 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning(): Unit = Utils.logUncaughtExceptions { + private def keepCleaning(): Unit = Utils.tryOrExit { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 16d88c17d1a76..7fde02040927d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def getRunner(operateFun: () => Unit): Runnable = { new Runnable() { - override def run() = Utils.logUncaughtExceptions { + override def run() = Utils.tryOrExit { operateFun() } } diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 18c627e8c7a15..b282019d2277d 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -57,7 +57,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri private val listenerThread = new Thread(name) { setDaemon(true) - override def run(): Unit = Utils.logUncaughtExceptions { + override def run(): Unit = Utils.tryOrExit { while (true) { eventLock.acquire() self.synchronized { From 6ad3eb002870e55afb208c80c0a6f21b649bf38d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Mar 2015 16:04:51 -0400 Subject: [PATCH 2/5] stop SparkContext instead of quit the JVM process --- .../scala/org/apache/spark/ContextCleaner.scala | 2 +- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/util/AsynchronousListenerBus.scala | 8 ++++++-- .../main/scala/org/apache/spark/util/Utils.scala | 13 +++++++++++++ .../spark/scheduler/EventLoggingListenerSuite.scala | 9 +++++---- .../apache/spark/scheduler/SparkListenerSuite.scala | 10 +++++----- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- 8 files changed, 33 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 329e6d275ea21..9b05c9623b704 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning(): Unit = Utils.tryOrExit { + private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8121aab3b0b34..4dfe67e04a87b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - listenerBus.start() + listenerBus.start(this) } /** Post the application start event */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7a9cf1c2e7f30..f33fd4450b2a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl( import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { - Utils.tryOrExit { checkSpeculatableTasks() } + Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } } } } diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index b282019d2277d..ce6e314463baa 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -21,6 +21,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import com.google.common.annotations.VisibleForTesting +import org.apache.spark.SparkContext /** * Asynchronously passes events to registered listeners. @@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri self => + private var sparkContext: SparkContext = null + /* Cap the capacity of the event queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ private val EVENT_QUEUE_CAPACITY = 10000 @@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri private val listenerThread = new Thread(name) { setDaemon(true) - override def run(): Unit = Utils.tryOrExit { + override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { while (true) { eventLock.acquire() self.synchronized { @@ -90,8 +93,9 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. */ - def start() { + def start(sc: SparkContext) { if (started.compareAndSet(false, true)) { + sparkContext = sc listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d3dc1d09cb7b4..f7b8f0ba7767f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1155,6 +1155,19 @@ private[spark] object Utils extends Logging { } } + /** + * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the + * default UncaughtExceptionHandler + */ + def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { + try { + block + } catch { + case e: ControlThrowable => throw e + case t: Throwable => sc.stop() + } + } + /** * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught * exceptions as IOException. This is used when implementing Externalizable and Serializable's diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 992dde66f982f..448258a754153 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,9 +25,9 @@ import scala.io.Source import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ -import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite} -import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} @@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils} * logging events, whether the parsing of the file names is correct, and whether the logged events * can be read and deserialized into actual SparkListenerEvents. */ -class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging { +class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter + with Logging { import EventLoggingListenerSuite._ private val fileSystem = Utils.getHadoopFileSystem("/", @@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start() + listenerBus.start(sc) listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 3a41ee8d4ae0c..627c9a4ddfffc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start() + bus.start(sc) assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(counter.count === 5) @@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers // Listener bus must not be started twice intercept[IllegalStateException] { val bus = new LiveListenerBus - bus.start() - bus.start() + bus.start(sc) + bus.start(sc) } // ... or stopped before starting @@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start() + bus.start(sc) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start() + bus.start(sc) // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index b3ffc71904c76..60bc099b27a4c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } }), "JobScheduler") - listenerBus.start() + listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) receiverTracker.start() jobGenerator.start() From 6087864c7b1e0558627234aabc3d85c7e830cf59 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Mar 2015 16:06:30 -0400 Subject: [PATCH 3/5] revise comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f7b8f0ba7767f..0d7c5cf2d27a7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1156,8 +1156,7 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the - * default UncaughtExceptionHandler + * Execute a block of code that evaluates to Unit, stop SparkContext is any uncaught exception */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { try { From 3c72cd8bb3237454e0866deb274d80db1d07f897 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Mar 2015 21:08:07 -0400 Subject: [PATCH 4/5] address the comments --- .../spark/util/AsynchronousListenerBus.scala | 2 ++ .../main/scala/org/apache/spark/util/Utils.scala | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index ce6e314463baa..ce7887b76ff96 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -92,6 +92,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * This first sends out all buffered events posted before this listener bus has started, then * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. + * + * @param sc Used to stop the SparkContext in case the listener thread dies. */ def start(sc: SparkContext) { if (started.compareAndSet(false, true)) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d7c5cf2d27a7..bf732faf77705 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1145,6 +1145,8 @@ private[spark] object Utils extends Logging { /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler + * + * NOTE: This method is to be called by the spark-started JVM process. */ def tryOrExit(block: => Unit) { try { @@ -1156,14 +1158,24 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code that evaluates to Unit, stop SparkContext is any uncaught exception + * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught + * exception + * + * NOTE: This method is to be called by the driver-side components to avoid stopping the + * user-started JVM process completely; in contrast, tryOrExit is to be called in the + * spark-started JVM process . */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { try { block } catch { case e: ControlThrowable => throw e - case t: Throwable => sc.stop() + case t: Throwable => + if (sc != null) { + logError(s"uncaught error in thread ${Thread.currentThread().getName}, stopping " + + "SparkContext", t) + sc.stop() + } } } From 589276a496e7371de1e51b37b7afdf4af7c27286 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 18 Mar 2015 14:40:08 -0400 Subject: [PATCH 5/5] throw fatal error again --- core/src/main/scala/org/apache/spark/util/Utils.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bf732faf77705..d8440f32cb7a3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1171,11 +1171,15 @@ private[spark] object Utils extends Logging { } catch { case e: ControlThrowable => throw e case t: Throwable => + val currentThreadName = Thread.currentThread().getName if (sc != null) { - logError(s"uncaught error in thread ${Thread.currentThread().getName}, stopping " + - "SparkContext", t) + logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) sc.stop() } + if (!NonFatal(t)) { + logError(s"throw uncaught fatal error in thread $currentThreadName", t) + throw t + } } }