-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test:
withTimeoutSafe
as a safe replacement for withTimeout
when …
…used inside a `runTest`/`TestScope`. More info: Kotlin/kotlinx.coroutines#3588 https://github.com/Kotlin/kotlinx.coroutines/blob/dea2ca5/kotlinx-coroutines-test/README.md#using-withtimeout-inside-runtest Signed-off-by: Artyom Shendrik <[email protected]>
- Loading branch information
Showing
3 changed files
with
235 additions
and
4 deletions.
There are no files selected for viewing
41 changes: 41 additions & 0 deletions
41
fluxo-core/src/commonTest/kotlin/kt/fluxo/test/WithTimeoutSafe.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package kt.fluxo.test | ||
|
||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.TimeoutCancellationException | ||
import kotlinx.coroutines.currentCoroutineContext | ||
import kotlinx.coroutines.test.TestScope | ||
import kotlinx.coroutines.test.runTest | ||
import kotlinx.coroutines.withContext | ||
import kotlinx.coroutines.withTimeout | ||
import kotlin.contracts.InvocationKind | ||
import kotlin.contracts.contract | ||
|
||
/** | ||
* Use as a safe replacement for [withTimeout] when used inside a [runTest]/[TestScope]. | ||
* | ||
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws | ||
* a [TimeoutCancellationException] if the timeout was exceeded. | ||
* | ||
* [Issue with the problem](https://github.com/Kotlin/kotlinx.coroutines/issues/3588) | ||
* | ||
* [Documented as intended behavior](https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-test/README.md#using-withtimeout-inside-runtest) | ||
* | ||
* @param timeMillis timeout time in milliseconds. | ||
* | ||
* @see withTimeout | ||
*/ | ||
@Suppress("MaxLineLength") | ||
suspend inline fun <T> withTimeoutSafe(timeMillis: Long, crossinline block: suspend CoroutineScope.() -> T): T { | ||
contract { | ||
callsInPlace(block, InvocationKind.EXACTLY_ONCE) | ||
} | ||
val context = currentCoroutineContext() | ||
return withContext(Dispatchers.Default) { | ||
withTimeout(timeMillis) { | ||
withContext(context) { | ||
block() | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
fluxo-core/src/commonTest/kotlin/kt/fluxo/tests/dsl/CoroutinesTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package kt.fluxo.tests.dsl | ||
|
||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.Job | ||
import kotlinx.coroutines.TimeoutCancellationException | ||
import kotlinx.coroutines.cancelAndJoin | ||
import kotlinx.coroutines.currentCoroutineContext | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.flow.MutableStateFlow | ||
import kotlinx.coroutines.flow.first | ||
import kotlinx.coroutines.isActive | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.plus | ||
import kotlinx.coroutines.sync.Mutex | ||
import kotlinx.coroutines.sync.Semaphore | ||
import kotlinx.coroutines.sync.withLock | ||
import kotlinx.coroutines.test.runTest | ||
import kotlinx.coroutines.withTimeout | ||
import kt.fluxo.test.IgnoreJs | ||
import kt.fluxo.test.runUnitTest | ||
import kt.fluxo.test.withTimeoutSafe | ||
import kotlin.test.Test | ||
import kotlin.test.assertFailsWith | ||
import kotlin.test.assertFalse | ||
import kotlin.test.assertTrue | ||
import kotlin.test.fail | ||
|
||
/** | ||
* Test to be sure that [kotlinx.coroutines] work as expected. | ||
*/ | ||
@IgnoreJs | ||
class CoroutinesTest { | ||
|
||
private companion object { | ||
private const val USE_WITH_TIMEOUT = true | ||
private const val USE_SAFE_WITH_TIMEOUT = true | ||
private const val TIMEOUT_MS = 5_000L | ||
private const val TEST_REPETITIONS = 1_000 | ||
} | ||
|
||
// TODO: Benchmark Mutex implementations | ||
// - StateFlow-based seems to be 2-3x slower than Mutex or Semaphore | ||
// - Semaphore seems to be 20-30% faster than Mutex! | ||
|
||
@Test | ||
fun with_timeout_unsafety() = runUnitTest(dispatchTimeoutMs = 10_000) { | ||
// https://github.com/Kotlin/kotlinx.coroutines/blob/dea2ca5/kotlinx-coroutines-test/README.md#using-withtimeout-inside-runtest | ||
// https://github.com/Kotlin/kotlinx.coroutines/issues/3588 | ||
assertFailsWith<TimeoutCancellationException> { | ||
repeat(times = 10_000) { | ||
val mutex = Mutex(locked = true) | ||
val job = backgroundScope.launch(Dispatchers.Default) { | ||
delay(10) | ||
mutex.unlock() | ||
// busy wait | ||
@Suppress("ControlFlowWithEmptyBody", "EmptyWhileBlock") | ||
while (currentCoroutineContext().isActive) { | ||
} | ||
} | ||
withTimeout(TIMEOUT_MS) { | ||
mutex.withLock {} | ||
} | ||
job.cancelAndJoin() | ||
} | ||
} | ||
} | ||
|
||
|
||
@Test | ||
fun with_timeout_mutex() = testImplementation( | ||
createMutex = { Mutex(locked = true) }, | ||
doUnlock = { | ||
// println("doUnlock (Mutex)") | ||
assertTrue(isLocked, "doUnlock called for non-locked Mutex (Mutex)") | ||
unlock() | ||
}, | ||
waitForMutexUnlock = { | ||
if (isLocked) { | ||
withLock { | ||
assertTrue(isLocked, "Mutex.isLocked = false when locked (Mutex)") | ||
} | ||
} | ||
// println("waitForMutexUnlock: unlocked (Mutex)") | ||
assertFalse(isLocked, "Mutex.isLocked = false (Mutex)") | ||
}, | ||
) | ||
|
||
@Test | ||
fun with_timeout_semaphore() = testImplementation( | ||
createMutex = { Semaphore(permits = 1, acquiredPermits = 1) }, | ||
doUnlock = { | ||
// println("doUnlock (Semaphore)") | ||
try { | ||
release() | ||
} catch (_: IllegalStateException) { | ||
fail("doUnlock called for non-locked Mutex (Semaphore)") | ||
} | ||
}, | ||
waitForMutexUnlock = { | ||
// wait for unlock | ||
acquire() | ||
try { | ||
// println("waitForMutexUnlock: unlocked (Semaphore)") | ||
release() | ||
} catch (_: IllegalStateException) { | ||
fail("Mutex.isLocked = false (Semaphore)") | ||
} | ||
}, | ||
) | ||
|
||
// MutableStateFlow as a locked "Mutex" | ||
@Test | ||
fun with_timeout_state_flow() = testImplementation( | ||
// MutableStateFlow as a locked "Mutex" | ||
createMutex = { MutableStateFlow(true) }, | ||
doUnlock = { | ||
// println("doUnlock (StateFlow)") | ||
assertTrue(compareAndSet(expect = true, update = false), "doUnlock called for non-locked Mutex (StateFlow)") | ||
assertFalse(value, "couldn't unlock Mutex (Mutex)") | ||
}, | ||
waitForMutexUnlock = { | ||
if (value) { | ||
// wait for unlock | ||
assertFalse(first { !it }, "Mutex waiting failed (StateFlow)") | ||
} | ||
// println("waitForMutexUnlock: unlocked (StateFlow)") | ||
assertFalse(value, "Mutex.isLocked = false (StateFlow)") | ||
}, | ||
) | ||
|
||
private inline fun <M> testImplementation( | ||
crossinline createMutex: () -> M, | ||
crossinline doUnlock: M.() -> Unit, | ||
crossinline waitForMutexUnlock: suspend M.() -> Unit, | ||
) = runTest(dispatchTimeoutMs = 10_000) { | ||
// Scopes that will do the actual work in the background | ||
val scopes = mapOf( | ||
"TestScope + Dispatchers.Default" to (this + Dispatchers.Default), | ||
"BackgroundTestScope + Dispatchers.Default" to (backgroundScope + Dispatchers.Default), | ||
"Job + Dispatchers.Default" to CoroutineScope(Job() + Dispatchers.Default), | ||
) | ||
repeat(TEST_REPETITIONS) { iteration -> | ||
for ((scopeName, scope) in scopes) { | ||
val info = "#$iteration, $scopeName" | ||
try { | ||
// println("start: $info") | ||
val mutex = createMutex() | ||
val asyncJob = scope.launch { | ||
mutex.doUnlock() | ||
// busy wait | ||
@Suppress("ControlFlowWithEmptyBody", "EmptyWhileBlock") | ||
while (currentCoroutineContext().isActive) { | ||
} | ||
} | ||
|
||
when { | ||
USE_SAFE_WITH_TIMEOUT -> withTimeoutSafe(TIMEOUT_MS) { | ||
mutex.waitForMutexUnlock() | ||
} | ||
|
||
USE_WITH_TIMEOUT -> withTimeout(TIMEOUT_MS) { | ||
mutex.waitForMutexUnlock() | ||
} | ||
|
||
else -> mutex.waitForMutexUnlock() | ||
} | ||
|
||
asyncJob.cancelAndJoin() | ||
// println("iteration finished\n") | ||
} catch (e: Throwable) { | ||
throw IllegalStateException("$e // $info", e) | ||
} | ||
} | ||
} | ||
} | ||
} |