Skip to content

Commit

Permalink
SPARK-6414: Spark driver failed with NPE on job cancelation
Browse files Browse the repository at this point in the history
Use Option for ActiveJob.properties to avoid NPE bug

Author: Hung Lin <[email protected]>

Closes #5124 from hunglin/SPARK-6414 and squashes the following commits:

2290b6b [Hung Lin] [SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup()
  • Loading branch information
Hung Lin authored and JoshRosen committed Apr 2, 2015
1 parent 0cce545 commit e3202aa
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def initialValue(): Properties = new Properties()
}

/**
Expand Down Expand Up @@ -474,9 +475,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Spark fair scheduler pool.
*/
def setLocalProperty(key: String, value: String) {
if (localProperties.get() == null) {
localProperties.set(new Properties())
}
if (value == null) {
localProperties.get.remove(key)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] = {
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
Expand Down Expand Up @@ -522,7 +522,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): Unit = {
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
Expand All @@ -542,7 +542,7 @@ class DAGScheduler(
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
properties: Properties = null): PartialResult[R] = {
properties: Properties): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
Expand Down Expand Up @@ -689,7 +689,7 @@ class DAGScheduler(
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
Expand Down Expand Up @@ -736,7 +736,7 @@ class DAGScheduler(
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null) {
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
Expand Down
20 changes: 19 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.spark

import java.io.File
import java.util.concurrent.TimeUnit

import com.google.common.base.Charsets._
import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable

import org.apache.spark.util.Utils

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class SparkContextSuite extends FunSuite with LocalSparkContext {

test("Only one SparkContext may be active at a time") {
Expand Down Expand Up @@ -173,4 +176,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
sc.cancelJobGroup("nonExistGroupId")
Await.ready(future, Duration(2, TimeUnit.SECONDS))

// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
// SparkContext to shutdown, so the following assertion will fail.
assert(sc.parallelize(1 to 10).count() == 10L)
} finally {
sc.stop()
}
}
}

0 comments on commit e3202aa

Please sign in to comment.