Skip to content

Commit

Permalink
test: withTimeoutSafe as a safe replacement for withTimeout when …
Browse files Browse the repository at this point in the history
  • Loading branch information
amal committed Jan 20, 2023
1 parent 172472f commit bb85e5b
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 4 deletions.
41 changes: 41 additions & 0 deletions fluxo-core/src/commonTest/kotlin/kt/fluxo/test/WithTimeoutSafe.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kt.fluxo.test

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract

/**
* Use as a safe replacement for [withTimeout] when used inside a [runTest]/[TestScope].
*
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
* a [TimeoutCancellationException] if the timeout was exceeded.
*
* [Issue with the problem](https://github.com/Kotlin/kotlinx.coroutines/issues/3588)
*
* [Documented as intended behavior](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-test/README.md#using-withtimeout-inside-runtest)
*
* @param timeMillis timeout time in milliseconds.
*
* @see withTimeout
*/
@Suppress("MaxLineLength")
suspend inline fun <T> withTimeoutSafe(timeMillis: Long, crossinline block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val context = currentCoroutineContext()
return withContext(Dispatchers.Default) {
withTimeout(timeMillis) {
withContext(context) {
block()
}
}
}
}
21 changes: 17 additions & 4 deletions fluxo-core/src/commonTest/kotlin/kt/fluxo/test/test.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,35 @@ import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* [runTest] with lowered default [timeout][dispatchTimeoutMs] and more safety.
*
* * [dispatchTimeoutMs] is 2 seconds by default
* * Uses [withTimeoutSafe] to actually prevent tests from hanging as [runTest] doesn't
* * Catches suppressed exceptions (can be helpful when debugging some problems)
*
* See issues [#3270](https://github.com/Kotlin/kotlinx.coroutines/issues/3270)
* and [#3588](https://github.com/Kotlin/kotlinx.coroutines/issues/3588) for more details.
*/
@ExperimentalCoroutinesApi
@Suppress("NestedBlockDepth")
fun runUnitTest(
context: CoroutineContext = EmptyCoroutineContext,
dispatchTimeoutMs: Long = DEFAULT_DISPATCH_TIMEOUT_MS,
dispatchTimeoutMs: Long = 2_000L,
testBody: suspend TestScope.() -> Unit,
): TestResult {
var scope: TestScope? = null
try {
return runTest(context, dispatchTimeoutMs) {
scope = this
testBody()
withTimeoutSafe(timeMillis = dispatchTimeoutMs) {
testBody()
}
}
} catch (e: IllegalStateException) {
val s = scope
// Internal TestScope check failed, e.g., unfinished child jobs, etc.
// Cancellation exceptions are lost without saving.
if (s != null && e.message == "Check failed.") {
for (sc in arrayOf(s, s.backgroundScope)) {
try {
Expand All @@ -39,13 +53,12 @@ fun runUnitTest(
} catch (_: IllegalStateException) {
}
}
// TODO: TestScopeImpl has an uncaughtExceptions field that can be read here, at least in JVM
}
throw e
}
}

internal const val DEFAULT_DISPATCH_TIMEOUT_MS = 2_000L


suspend fun <T> inScope(
scope: CoroutineScope,
Expand Down
177 changes: 177 additions & 0 deletions fluxo-core/src/commonTest/kotlin/kt/fluxo/tests/dsl/CoroutinesTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package kt.fluxo.tests.dsl

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kt.fluxo.test.IgnoreJs
import kt.fluxo.test.runUnitTest
import kt.fluxo.test.withTimeoutSafe
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.fail

/**
* Test to be sure that [kotlinx.coroutines] work as expected.
*/
@IgnoreJs
class CoroutinesTest {

private companion object {
private const val USE_WITH_TIMEOUT = true
private const val USE_SAFE_WITH_TIMEOUT = true
private const val TIMEOUT_MS = 5_000L
private const val TEST_REPETITIONS = 1_000
}

// TODO: Benchmark Mutex implementations
// - StateFlow-based seems to be 2-3x slower than Mutex or Semaphore
// - Semaphore seems to be 20-30% faster than Mutex!

@Test
fun with_timeout_unsafety() = runUnitTest(dispatchTimeoutMs = 10_000) {
// https://github.com/Kotlin/kotlinx.coroutines/blob/dea2ca5/kotlinx-coroutines-test/README.md#using-withtimeout-inside-runtest
// https://github.com/Kotlin/kotlinx.coroutines/issues/3588
assertFailsWith<TimeoutCancellationException> {
repeat(times = 10_000) {
val mutex = Mutex(locked = true)
val job = backgroundScope.launch(Dispatchers.Default) {
delay(10)
mutex.unlock()
// busy wait
@Suppress("ControlFlowWithEmptyBody", "EmptyWhileBlock")
while (currentCoroutineContext().isActive) {
}
}
withTimeout(TIMEOUT_MS) {
mutex.withLock {}
}
job.cancelAndJoin()
}
}
}


@Test
fun with_timeout_mutex() = testImplementation(
createMutex = { Mutex(locked = true) },
doUnlock = {
// println("doUnlock (Mutex)")
assertTrue(isLocked, "doUnlock called for non-locked Mutex (Mutex)")
unlock()
},
waitForMutexUnlock = {
if (isLocked) {
withLock {
assertTrue(isLocked, "Mutex.isLocked = false when locked (Mutex)")
}
}
// println("waitForMutexUnlock: unlocked (Mutex)")
assertFalse(isLocked, "Mutex.isLocked = false (Mutex)")
},
)

@Test
fun with_timeout_semaphore() = testImplementation(
createMutex = { Semaphore(permits = 1, acquiredPermits = 1) },
doUnlock = {
// println("doUnlock (Semaphore)")
try {
release()
} catch (_: IllegalStateException) {
fail("doUnlock called for non-locked Mutex (Semaphore)")
}
},
waitForMutexUnlock = {
// wait for unlock
acquire()
try {
// println("waitForMutexUnlock: unlocked (Semaphore)")
release()
} catch (_: IllegalStateException) {
fail("Mutex.isLocked = false (Semaphore)")
}
},
)

// MutableStateFlow as a locked "Mutex"
@Test
fun with_timeout_state_flow() = testImplementation(
// MutableStateFlow as a locked "Mutex"
createMutex = { MutableStateFlow(true) },
doUnlock = {
// println("doUnlock (StateFlow)")
assertTrue(compareAndSet(expect = true, update = false), "doUnlock called for non-locked Mutex (StateFlow)")
assertFalse(value, "couldn't unlock Mutex (Mutex)")
},
waitForMutexUnlock = {
if (value) {
// wait for unlock
assertFalse(first { !it }, "Mutex waiting failed (StateFlow)")
}
// println("waitForMutexUnlock: unlocked (StateFlow)")
assertFalse(value, "Mutex.isLocked = false (StateFlow)")
},
)

private inline fun <M> testImplementation(
crossinline createMutex: () -> M,
crossinline doUnlock: M.() -> Unit,
crossinline waitForMutexUnlock: suspend M.() -> Unit,
) = runTest(dispatchTimeoutMs = 10_000) {
// Scopes that will do the actual work in the background
val scopes = mapOf(
"TestScope + Dispatchers.Default" to (this + Dispatchers.Default),
"BackgroundTestScope + Dispatchers.Default" to (backgroundScope + Dispatchers.Default),
"Job + Dispatchers.Default" to CoroutineScope(Job() + Dispatchers.Default),
)
repeat(TEST_REPETITIONS) { iteration ->
for ((scopeName, scope) in scopes) {
val info = "#$iteration, $scopeName"
try {
// println("start: $info")
val mutex = createMutex()
val asyncJob = scope.launch {
mutex.doUnlock()
// busy wait
@Suppress("ControlFlowWithEmptyBody", "EmptyWhileBlock")
while (currentCoroutineContext().isActive) {
}
}

when {
USE_SAFE_WITH_TIMEOUT -> withTimeoutSafe(TIMEOUT_MS) {
mutex.waitForMutexUnlock()
}

USE_WITH_TIMEOUT -> withTimeout(TIMEOUT_MS) {
mutex.waitForMutexUnlock()
}

else -> mutex.waitForMutexUnlock()
}

asyncJob.cancelAndJoin()
// println("iteration finished\n")
} catch (e: Throwable) {
throw IllegalStateException("$e // $info", e)
}
}
}
}
}

0 comments on commit bb85e5b

Please sign in to comment.