Skip to content

Commit

Permalink
Fix MutexCancellationStressTest flakiness (#3724)
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad authored Apr 26, 2023
1 parent f537089 commit d6f1403
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.*

class MutexCancellationStressTest : TestBase() {
@Test
Expand All @@ -18,13 +22,16 @@ class MutexCancellationStressTest : TestBase() {
val mutexOwners = Array(mutexJobNumber) { "$it" }
val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher()
var counter = 0
val counterLocal = Array(mutexJobNumber) { LocalAtomicInt(0) }
val completed = LocalAtomicInt(0)
val counterLocal = Array(mutexJobNumber) { AtomicInteger(0) }
val completed = AtomicBoolean(false)
val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId ->
val coroutineName = "MutexJob-$jobId"
launch(dispatcher + CoroutineName(coroutineName)) {
while (completed.value == 0) {
// ATOMIC to always have a chance to proceed
launch(dispatcher + CoroutineName(coroutineName), CoroutineStart.ATOMIC) {
while (!completed.get()) {
// Stress out holdsLock
mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber])
// Stress out lock-like primitives
if (mutex.tryLock(mutexOwners[jobId])) {
counterLocal[jobId].incrementAndGet()
counter++
Expand All @@ -47,30 +54,32 @@ class MutexCancellationStressTest : TestBase() {
val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList()
val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) {
var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 }
while (completed.value == 0) {
delay(1000)
while (!completed.get()) {
delay(500)
// If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out
if (completed.get()) return@launch
val c = counterLocal.map { it.value }
for (i in 0 until mutexJobNumber) {
assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i" }
assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" }
}
lastCounterLocalSnapshot = c
}
}
val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) {
var cancellingJobId = 0
while (completed.value == 0) {
while (!completed.get()) {
val jobToCancel = mutexJobs.removeFirst()
jobToCancel.cancelAndJoin()
mutexJobs += mutexJobLauncher(cancellingJobId)
cancellingJobId = (cancellingJobId + 1) % mutexJobNumber
}
}
delay(2000L * stressTestMultiplier)
completed.value = 1
completed.set(true)
cancellationJob.join()
mutexJobs.forEach { it.join() }
checkProgressJob.join()
check(counter == counterLocal.sumOf { it.value })
assertEquals(counter, counterLocal.sumOf { it.value })
dispatcher.close()
}
}

0 comments on commit d6f1403

Please sign in to comment.