Skip to content

Commit

Permalink
[SPARK-23881][CORE][TEST] Fix flaky test JobCancellationSuite."interr…
Browse files Browse the repository at this point in the history
…uptible iterator of shuffle reader"

## What changes were proposed in this pull request?

The test case JobCancellationSuite."interruptible iterator of shuffle reader" has been flaky because `KillTask` event is handled asynchronously, so it can happen that the semaphore is released but the task is still running.
Actually we only have to check if the total number of processed elements is less than the input elements number, so we know the task get cancelled.

## How was this patch tested?

The new test case still fails without the purposed patch, and succeeded in current master.

Author: Xingbo Jiang <[email protected]>

Closes apache#20993 from jiangxb1987/JobCancellationSuite.
  • Loading branch information
jiangxb1987 authored and gatorsmile committed Apr 9, 2018
1 parent 32471ba commit d81f29e
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,15 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
import JobCancellationSuite._
sc = new SparkContext("local[2]", "test interruptible iterator")

// Increase the number of elements to be proceeded to avoid this test being flaky.
val numElements = 10000
val taskCompletedSem = new Semaphore(0)

sc.addSparkListener(new SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
// release taskCancelledSemaphore when cancelTasks event has been posted
if (stageCompleted.stageInfo.stageId == 1) {
taskCancelledSemaphore.release(1000)
taskCancelledSemaphore.release(numElements)
}
}

Expand All @@ -349,36 +351,39 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
})

val f = sc.parallelize(1 to 1000).map { i => (i, i) }
// Explicitly disable interrupt task thread on cancelling tasks, so the task thread can only be
// interrupted by `InterruptibleIterator`.
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

val f = sc.parallelize(1 to numElements).map { i => (i, i) }
.repartitionAndSortWithinPartitions(new HashPartitioner(1))
.mapPartitions { iter =>
taskStartedSemaphore.release()
iter
}.foreachAsync { x =>
if (x._1 >= 10) {
// This block of code is partially executed. It will be blocked when x._1 >= 10 and the
// next iteration will be cancelled if the source iterator is interruptible. Then in this
// case, the maximum num of increment would be 10(|1...10|)
taskCancelledSemaphore.acquire()
}
// Block this code from being executed, until the job get cancelled. In this case, if the
// source iterator is interruptible, the max number of increment should be under
// `numElements`.
taskCancelledSemaphore.acquire()
executionOfInterruptibleCounter.getAndIncrement()
}

taskStartedSemaphore.acquire()
// Job is cancelled when:
// 1. task in reduce stage has been started, guaranteed by previous line.
// 2. task in reduce stage is blocked after processing at most 10 records as
// taskCancelledSemaphore is not released until cancelTasks event is posted
// After job being cancelled, task in reduce stage will be cancelled and no more iteration are
// executed.
// 2. task in reduce stage is blocked as taskCancelledSemaphore is not released until
// JobCancelled event is posted.
// After job being cancelled, task in reduce stage will be cancelled asynchronously, thus
// partial of the inputs should not get processed (It's very unlikely that Spark can process
// 10000 elements between JobCancelled is posted and task is really killed).
f.cancel()

val e = intercept[SparkException](f.get()).getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))

// Make sure tasks are indeed completed.
taskCompletedSem.acquire()
assert(executionOfInterruptibleCounter.get() <= 10)
assert(executionOfInterruptibleCounter.get() < numElements)
}

def testCount() {
Expand Down

0 comments on commit d81f29e

Please sign in to comment.