diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 575f6ec9d9..06002f4429 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -138,8 +138,11 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt { public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext; public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext; public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext; - public static final fun restoreThreadContext (Ljava/lang/String;)V - public static final fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/String; +} + +public abstract interface class kotlinx/coroutines/experimental/CoroutineContextThreadLocal { + public abstract fun restoreThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;Ljava/lang/Object;)V + public abstract fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/Object; } public abstract class kotlinx/coroutines/experimental/CoroutineDispatcher : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlin/coroutines/experimental/ContinuationInterceptor { @@ -881,6 +884,8 @@ public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt { public final class kotlinx/coroutines/experimental/intrinsics/UndispatchedKt { public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V + public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V + public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; } diff --git a/build.gradle b/build.gradle index 116af20108..001a736592 100644 --- a/build.gradle +++ b/build.gradle @@ -105,8 +105,10 @@ configure(subprojects.findAll { !it.name.contains(sourceless) && it.name != "ben sourceSets { main.kotlin.srcDirs = ['src'] test.kotlin.srcDirs = ['test'] + // todo: do we still need this workaround? if (!projectName.endsWith("-native")) { main.resources.srcDirs = ['resources'] + test.resources.srcDirs = ['test-resources'] } } } diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt index fde4da1379..04ecf6bb03 100644 --- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt +++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt @@ -119,8 +119,11 @@ public suspend fun withContext( // fast path #3 if the new dispatcher is the same as the old one. // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { - val newContinuation = RunContinuationDirect(newContext, uCont) - return@sc block.startCoroutineUninterceptedOrReturn(newContinuation) + val newContinuation = RunContinuationUndispatched(newContext, uCont) + // There are some other changes in the context, so this thread needs to be updated + withCoroutineContext(newContext) { + return@sc block.startCoroutineUninterceptedOrReturn(newContinuation) + } } // slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion require(!start.isLazy) { "$start start is not supported" } @@ -130,7 +133,6 @@ public suspend fun withContext( resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE ) completion.initParentJobInternal(newContext[Job]) // attach to job - @Suppress("DEPRECATION") start(block, completion) completion.getResult() } @@ -172,10 +174,22 @@ private class LazyStandaloneCoroutine( } } -private class RunContinuationDirect( +private class RunContinuationUndispatched( override val context: CoroutineContext, - continuation: Continuation -) : Continuation by continuation + private val continuation: Continuation +): Continuation { + override fun resume(value: T) { + withCoroutineContext(continuation.context) { + continuation.resume(value) + } + } + + override fun resumeWithException(exception: Throwable) { + withCoroutineContext(continuation.context) { + continuation.resumeWithException(exception) + } + } +} @Suppress("UNCHECKED_CAST") private class RunCompletion( diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt index 9fa2739d2a..07f2afa5a0 100644 --- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt +++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt @@ -431,7 +431,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 // already complete -- select result if (select.trySelect(null)) { select.completion.context.checkCompletion() // always check for our completion - block.startCoroutineUndispatched(select.completion) + block.startCoroutineUnintercepted(select.completion) } return } @@ -803,7 +803,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0 if (state is CompletedExceptionally) select.resumeSelectCancellableWithException(state.cause) else - block.startCoroutineUndispatched(state as T, select.completion) + block.startCoroutineUnintercepted(state as T, select.completion) } return } diff --git a/common/kotlinx-coroutines-core-common/src/ResumeMode.kt b/common/kotlinx-coroutines-core-common/src/ResumeMode.kt index 6faf515911..a44434747c 100644 --- a/common/kotlinx-coroutines-core-common/src/ResumeMode.kt +++ b/common/kotlinx-coroutines-core-common/src/ResumeMode.kt @@ -43,7 +43,7 @@ internal fun Continuation.resumeUninterceptedMode(value: T, mode: Int) { MODE_ATOMIC_DEFAULT -> intercepted().resume(value) MODE_CANCELLABLE -> intercepted().resumeCancellable(value) MODE_DIRECT -> resume(value) - MODE_UNDISPATCHED -> resume(value) + MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) } MODE_IGNORE -> {} else -> error("Invalid mode $mode") } @@ -54,7 +54,7 @@ internal fun Continuation.resumeUninterceptedWithExceptionMode(exception: MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception) MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception) MODE_DIRECT -> resumeWithException(exception) - MODE_UNDISPATCHED -> resumeWithException(exception) + MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) } MODE_IGNORE -> {} else -> error("Invalid mode $mode") } diff --git a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt index 3392b70574..53a74b443a 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt @@ -415,7 +415,7 @@ public abstract class AbstractSendChannel : SendChannel { offerResult === ALREADY_SELECTED -> return offerResult === OFFER_FAILED -> {} // retry offerResult === OFFER_SUCCESS -> { - block.startCoroutineUndispatched(receiver = this, completion = select.completion) + block.startCoroutineUnintercepted(receiver = this, completion = select.completion) return } offerResult is Closed<*> -> throw offerResult.sendException @@ -754,7 +754,7 @@ public abstract class AbstractChannel : AbstractSendChannel(), Channel pollResult === POLL_FAILED -> {} // retry pollResult is Closed<*> -> throw pollResult.receiveException else -> { - block.startCoroutineUndispatched(pollResult as E, select.completion) + block.startCoroutineUnintercepted(pollResult as E, select.completion) return } } @@ -789,14 +789,14 @@ public abstract class AbstractChannel : AbstractSendChannel(), Channel pollResult is Closed<*> -> { if (pollResult.closeCause == null) { if (select.trySelect(null)) - block.startCoroutineUndispatched(null, select.completion) + block.startCoroutineUnintercepted(null, select.completion) return } else throw pollResult.closeCause } else -> { // selected successfully - block.startCoroutineUndispatched(pollResult as E, select.completion) + block.startCoroutineUnintercepted(pollResult as E, select.completion) return } } diff --git a/common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt index 1c62aecc03..4bbaf0b1da 100644 --- a/common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/channels/ConflatedBroadcastChannel.kt @@ -263,7 +263,7 @@ public class ConflatedBroadcastChannel() : BroadcastChannel { select.resumeSelectCancellableWithException(it.sendException) return } - block.startCoroutineUndispatched(receiver = this, completion = select.completion) + block.startCoroutineUnintercepted(receiver = this, completion = select.completion) } @Suppress("DEPRECATION") diff --git a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt index e7ab254e4f..80048a8497 100644 --- a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt @@ -9,38 +9,69 @@ import kotlin.coroutines.experimental.* import kotlin.coroutines.experimental.intrinsics.* /** - * Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context. + * Use this function to restart coroutine directly from inside of [suspendCoroutine], + * when the code is already in the context of this coroutine. + * It does not use [ContinuationInterceptor] and does not update context of the current thread. */ -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST") -public fun (suspend () -> T).startCoroutineUndispatched(completion: Continuation) { - val value = try { +public fun (suspend () -> T).startCoroutineUnintercepted(completion: Continuation) { + startDirect(completion) { startCoroutineUninterceptedOrReturn(completion) - } catch (e: Throwable) { - completion.resumeWithException(e) - return } - if (value !== COROUTINE_SUSPENDED) - completion.resume(value as T) } /** - * Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context. + * Use this function to restart coroutine directly from inside of [suspendCoroutine], + * when the code is already in the context of this coroutine. + * It does not use [ContinuationInterceptor] and does not update context of the current thread. + */ +public fun (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation) { + startDirect(completion) { + startCoroutineUninterceptedOrReturn(receiver, completion) + } +} + +/** + * Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode — + * immediately execute coroutine in the current thread until next suspension. + * It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine. + */ +public fun (suspend () -> T).startCoroutineUndispatched(completion: Continuation) { + startDirect(completion) { + withCoroutineContext(completion.context) { + startCoroutineUninterceptedOrReturn(completion) + } + } +} + +/** + * Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode — + * immediately execute coroutine in the current thread until next suspension. + * It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine. */ -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST") public fun (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation) { + startDirect(completion) { + withCoroutineContext(completion.context) { + startCoroutineUninterceptedOrReturn(receiver, completion) + } + } +} + +private inline fun startDirect(completion: Continuation, block: () -> Any?) { val value = try { - startCoroutineUninterceptedOrReturn(receiver, completion) + block() } catch (e: Throwable) { completion.resumeWithException(e) return } - if (value !== COROUTINE_SUSPENDED) + if (value !== COROUTINE_SUSPENDED) { + @Suppress("UNCHECKED_CAST") completion.resume(value as T) + } } /** * Starts this coroutine with the given code [block] in the same context and returns result when it - * completes without suspnesion. + * completes without suspension. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it @@ -53,7 +84,7 @@ public fun AbstractCoroutine.startUndispatchedOrReturn(block: suspend () /** * Starts this coroutine with the given code [block] in the same context and returns result when it - * completes without suspnesion. + * completes without suspension. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it diff --git a/common/kotlinx-coroutines-core-common/src/selects/Select.kt b/common/kotlinx-coroutines-core-common/src/selects/Select.kt index 5fddf70621..c172d75aa0 100644 --- a/common/kotlinx-coroutines-core-common/src/selects/Select.kt +++ b/common/kotlinx-coroutines-core-common/src/selects/Select.kt @@ -408,7 +408,7 @@ internal class SelectBuilderImpl( override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) { if (time <= 0L) { if (trySelect(null)) - block.startCoroutineUndispatched(completion) + block.startCoroutineUnintercepted(completion) return } val action = Runnable { diff --git a/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt b/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt index ede035776e..c8d2312688 100644 --- a/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt +++ b/common/kotlinx-coroutines-core-common/src/sync/Mutex.kt @@ -253,7 +253,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { val failure = select.performAtomicTrySelect(TryLockDesc(this, owner)) when { failure == null -> { // success - block.startCoroutineUndispatched(receiver = this, completion = select.completion) + block.startCoroutineUnintercepted(receiver = this, completion = select.completion) return } failure === ALREADY_SELECTED -> return // already selected -- bail out diff --git a/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt index d4a9be4fdb..a80577042c 100644 --- a/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/test/selects/SelectArrayChannelTest.kt @@ -289,6 +289,6 @@ class SelectArrayChannelTest : TestBase() { internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion if (!trySelect(null)) return - block.startCoroutineUndispatched(this) + block.startCoroutineUnintercepted(this) } } diff --git a/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt index 631230628d..2f7f63b500 100644 --- a/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/test/selects/SelectRendezvousChannelTest.kt @@ -310,6 +310,6 @@ class SelectRendezvousChannelTest : TestBase() { internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion if (!trySelect(null)) return - block.startCoroutineUndispatched(this) + block.startCoroutineUnintercepted(this) } } diff --git a/core/kotlinx-coroutines-core/src/CoroutineContext.kt b/core/kotlinx-coroutines-core/src/CoroutineContext.kt index 3c0047c72b..399e466b73 100644 --- a/core/kotlinx-coroutines-core/src/CoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/CoroutineContext.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.experimental +import java.util.* import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.experimental.AbstractCoroutineContextElement import kotlin.coroutines.experimental.ContinuationInterceptor @@ -40,6 +41,17 @@ internal val DEBUG = run { } } +@Suppress("UNCHECKED_CAST") +internal val coroutineContextThreadLocal: CoroutineContextThreadLocal? = run { + val services = ServiceLoader.load(CoroutineContextThreadLocal::class.java).toMutableList() + if (DEBUG) services.add(0, DebugThreadName) + when (services.size) { + 0 -> null + 1 -> services.single() as CoroutineContextThreadLocal + else -> CoroutineContextThreadLocalList((services as List>).toTypedArray()) + } +} + private val COROUTINE_ID = AtomicLong() // for tests only @@ -89,29 +101,37 @@ public actual fun newCoroutineContext(context: CoroutineContext, parent: Job? = * Executes a block using a given coroutine context. */ internal actual inline fun withCoroutineContext(context: CoroutineContext, block: () -> T): T { - val oldName = context.updateThreadContext() + val oldValue = coroutineContextThreadLocal?.updateThreadContext(context) try { return block() } finally { - restoreThreadContext(oldName) + coroutineContextThreadLocal?.restoreThreadContext(context, oldValue) } } -@PublishedApi -internal fun CoroutineContext.updateThreadContext(): String? { - if (!DEBUG) return null - val coroutineId = this[CoroutineId] ?: return null - val coroutineName = this[CoroutineName]?.name ?: "coroutine" - val currentThread = Thread.currentThread() - val oldName = currentThread.name - currentThread.name = buildString(oldName.length + coroutineName.length + 10) { - append(oldName) - append(" @") - append(coroutineName) - append('#') - append(coroutineId.id) +private const val DEBUG_THREAD_NAME_SEPARATOR = " @" + +private object DebugThreadName : CoroutineContextThreadLocal { + override fun updateThreadContext(context: CoroutineContext): String? { + val coroutineId = context[CoroutineId] ?: return null + val coroutineName = context[CoroutineName]?.name ?: "coroutine" + val currentThread = Thread.currentThread() + val oldName = currentThread.name + var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR) + if (lastIndex < 0) lastIndex = oldName.length + currentThread.name = buildString(lastIndex + coroutineName.length + 10) { + append(oldName.substring(0, lastIndex)) + append(DEBUG_THREAD_NAME_SEPARATOR) + append(coroutineName) + append('#') + append(coroutineId.id) + } + return oldName + } + + override fun restoreThreadContext(context: CoroutineContext, oldValue: String?) { + if (oldValue != null) Thread.currentThread().name = oldValue } - return oldName } internal actual val CoroutineContext.coroutineName: String? get() { @@ -121,12 +141,7 @@ internal actual val CoroutineContext.coroutineName: String? get() { return "$coroutineName#${coroutineId.id}" } -@PublishedApi -internal fun restoreThreadContext(oldName: String?) { - if (oldName != null) Thread.currentThread().name = oldName -} - -private class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) { +internal data class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) { companion object Key : CoroutineContext.Key override fun toString(): String = "CoroutineId($id)" } diff --git a/core/kotlinx-coroutines-core/src/CoroutineContextThreadLocal.kt b/core/kotlinx-coroutines-core/src/CoroutineContextThreadLocal.kt new file mode 100644 index 0000000000..1ad50fd718 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/CoroutineContextThreadLocal.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental + +import kotlin.coroutines.experimental.* + +/** + * An extension point to define elements in [CoroutineContext] that are installed into thread local + * variables every time the coroutine from the specified context in resumed on a thread. + * + * Implementations on this interface are looked up via [java.util.ServiceLoader]. + * + * Example usage looks like this: + * + * ``` + * // declare custom coroutine context element, storing some custom data + * class MyElement(val data: MyData) : AbstractCoroutineContextElement(Key) { + * companion object Key : CoroutineContext.Key + * } + * + * // declare thread local variable + * private val myThreadLocal = ThreadLocal() + * + * // declare extension point implementation + * class MyCoroutineContextThreadLocal : CoroutineContextThreadLocal { + * // this is invoked before coroutine is resumed on current thread + * override fun updateThreadContext(context: CoroutineContext): MyElement? { + * val oldValue = myThreadLocal.get() + * myThreadLocal.set(context[MyElement]) + * return oldValue + * } + * + * // this is invoked after coroutine has suspended on current thread + * override fun restoreThreadContext(context: CoroutineContext, oldValue: MyElement?) { + * myThreadLocal.set(oldValue) + * } + * } + * ``` + * + * Now, `MyCoroutineContextThreadLocal` fully qualified class named shall be registered via + * `META-INF/services/kotlinx.coroutines.experimental.CoroutineContextThreadLocal` file. + */ +public interface CoroutineContextThreadLocal { + /** + * Updates context of the current thread. + * This function is invoked before the coroutine in the specified [context] is resumed in the current thread. + * The result of this function is the old value that will be passed to [restoreThreadContext]. + */ + public fun updateThreadContext(context: CoroutineContext): T + + /** + * Restores context of the current thread. + * This function is invoked after the coroutine in the specified [context] is suspended in the current thread. + * The value of [oldValue] is the result of the previous invocation of [updateThreadContext]. + */ + public fun restoreThreadContext(context: CoroutineContext, oldValue: T) +} + +/** + * This class is used when multiple [CoroutineContextThreadLocal] are installed. + */ +internal class CoroutineContextThreadLocalList( + private val impls: Array> +) : CoroutineContextThreadLocal { + init { + require(impls.size > 1) + } + + private val threadLocalStack = ThreadLocal?>() + + override fun updateThreadContext(context: CoroutineContext): Any? { + val stack = threadLocalStack.get() ?: ArrayList().also { + threadLocalStack.set(it) + } + val lastIndex = impls.lastIndex + for (i in 0 until lastIndex) { + stack.add(impls[i].updateThreadContext(context)) + } + return impls[lastIndex].updateThreadContext(context) + } + + override fun restoreThreadContext(context: CoroutineContext, oldValue: Any?) { + val stack = threadLocalStack.get()!! // must be there + val lastIndex = impls.lastIndex + impls[lastIndex].restoreThreadContext(context, oldValue) + for (i in lastIndex - 1 downTo 0) { + impls[i].restoreThreadContext(context, stack.removeAt(stack.lastIndex)) + } + } +} \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/test-resources/META-INF/services/kotlinx.coroutines.experimental.CoroutineContextThreadLocal b/core/kotlinx-coroutines-core/test-resources/META-INF/services/kotlinx.coroutines.experimental.CoroutineContextThreadLocal new file mode 100644 index 0000000000..368e3de5f7 --- /dev/null +++ b/core/kotlinx-coroutines-core/test-resources/META-INF/services/kotlinx.coroutines.experimental.CoroutineContextThreadLocal @@ -0,0 +1 @@ +kotlinx.coroutines.experimental.MyCoroutineContextThreadLocal \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/test/CoroutineContextThreadLocalTest.kt b/core/kotlinx-coroutines-core/test/CoroutineContextThreadLocalTest.kt new file mode 100644 index 0000000000..fac6a09bf9 --- /dev/null +++ b/core/kotlinx-coroutines-core/test/CoroutineContextThreadLocalTest.kt @@ -0,0 +1,78 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental + +import org.junit.Test +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class CoroutineContextThreadLocalTest : TestBase() { + @Test + fun testExample() = runTest { + val exceptionHandler = coroutineContext[CoroutineExceptionHandler]!! + val mainDispatcher = coroutineContext[ContinuationInterceptor]!! + val mainThread = Thread.currentThread() + val element = MyElement(MyData()) + assertNull(myThreadLocal.get()) + val job = launch(element + exceptionHandler) { + assertTrue(mainThread != Thread.currentThread()) + assertSame(element, coroutineContext[MyElement]) + assertSame(element, myThreadLocal.get()) + withContext(mainDispatcher) { + assertSame(mainThread, Thread.currentThread()) + assertSame(element, coroutineContext[MyElement]) + assertSame(element, myThreadLocal.get()) + } + assertTrue(mainThread != Thread.currentThread()) + assertSame(element, coroutineContext[MyElement]) + assertSame(element, myThreadLocal.get()) + } + assertNull(myThreadLocal.get()) + job.join() + assertNull(myThreadLocal.get()) + } + + @Test + fun testUndispatched()= runTest { + val exceptionHandler = coroutineContext[CoroutineExceptionHandler]!! + val element = MyElement(MyData()) + val job = launch( + context = DefaultDispatcher + exceptionHandler + element, + start = CoroutineStart.UNDISPATCHED + ) { + assertSame(element, myThreadLocal.get()) + yield() + assertSame(element, myThreadLocal.get()) + } + assertNull(myThreadLocal.get()) + job.join() + assertNull(myThreadLocal.get()) + } +} + +class MyData + +// declare custom coroutine context element, storing some custom data +class MyElement(val data: MyData) : AbstractCoroutineContextElement(Key) { + companion object Key : CoroutineContext.Key +} + +// declare thread local variable +private val myThreadLocal = ThreadLocal() + +// declare extension point implementation +class MyCoroutineContextThreadLocal : CoroutineContextThreadLocal { + // this is invoked before coroutine is resumed on current thread + override fun updateThreadContext(context: CoroutineContext): MyElement? { + val oldValue = myThreadLocal.get() + myThreadLocal.set(context[MyElement]) + return oldValue + } + + // this is invoked after coroutine has suspended on current thread + override fun restoreThreadContext(context: CoroutineContext, oldValue: MyElement?) { + myThreadLocal.set(oldValue) + } +} diff --git a/core/kotlinx-coroutines-core/test/DebugThreadNameTest.kt b/core/kotlinx-coroutines-core/test/DebugThreadNameTest.kt new file mode 100644 index 0000000000..ff91555c88 --- /dev/null +++ b/core/kotlinx-coroutines-core/test/DebugThreadNameTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.experimental + +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class DebugThreadNameTest : TestBase() { + @BeforeTest + fun resetName() { + resetCoroutineId() + } + + @Test + fun testLaunchId() = runTest { + assertName("coroutine#1") + launch(coroutineContext) { + assertName("coroutine#2") + yield() + assertName("coroutine#2") + } + assertName("coroutine#1") + } + + @Test + fun testLaunchIdUndispatched() = runTest { + assertName("coroutine#1") + launch(coroutineContext, start = CoroutineStart.UNDISPATCHED) { + assertName("coroutine#2") + yield() + assertName("coroutine#2") + } + assertName("coroutine#1") + } + + @Test + fun testLaunchName() = runTest { + assertName("coroutine#1") + launch(coroutineContext + CoroutineName("TEST")) { + assertName("TEST#2") + yield() + assertName("TEST#2") + } + assertName("coroutine#1") + } + + @Test + fun testWithContext() = runTest { + assertName("coroutine#1") + withContext(DefaultDispatcher) { + assertName("coroutine#1") + yield() + assertName("coroutine#1") + withContext(CoroutineName("TEST")) { + assertName("TEST#1") + yield() + assertName("TEST#1") + } + assertName("coroutine#1") + yield() + assertName("coroutine#1") + } + assertName("coroutine#1") + } + + private fun assertName(expected: String) { + val name = Thread.currentThread().name + val split = name.split(Regex(" @")) + assertEquals(2, split.size, "Thread name '$name' is expected to contain one coroutine name") + assertEquals(expected, split[1], "Thread name '$name' is expected to end with coroutine name '$expected'") + } +} \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/test/TestBase.kt b/core/kotlinx-coroutines-core/test/TestBase.kt index 110f1f9b5b..b8b768a12f 100644 --- a/core/kotlinx-coroutines-core/test/TestBase.kt +++ b/core/kotlinx-coroutines-core/test/TestBase.kt @@ -54,6 +54,12 @@ public actual open class TestBase actual constructor() { throw exception } + private fun printError(message: String, cause: Throwable) { + error.compareAndSet(null, cause) + println("$message: $cause") + cause.printStackTrace(System.out) + } + /** * Throws [IllegalStateException] when `value` is false like `check` in stdlib, but also ensures that the * test will not complete successfully even if this exception is consumed somewhere in the test. @@ -121,10 +127,12 @@ public actual open class TestBase actual constructor() { runBlocking(block = block, context = CoroutineExceptionHandler { context, e -> if (e is CancellationException) return@CoroutineExceptionHandler // are ignored exCount++ - if (exCount > unhandled.size) - error("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e) - if (!unhandled[exCount - 1](e)) - error("Unhandled exception was unexpected: $e", e) + when { + exCount > unhandled.size -> + printError("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e) + !unhandled[exCount - 1](e) -> + printError("Unhandled exception was unexpected: $e", e) + } context[Job]?.cancel(e) }) } catch (e: Throwable) { diff --git a/core/kotlinx-coroutines-core/test/selects/SelectChannelStressTest.kt b/core/kotlinx-coroutines-core/test/selects/SelectChannelStressTest.kt index 95d661c3af..a83b1c4b59 100644 --- a/core/kotlinx-coroutines-core/test/selects/SelectChannelStressTest.kt +++ b/core/kotlinx-coroutines-core/test/selects/SelectChannelStressTest.kt @@ -71,6 +71,6 @@ class SelectChannelStressTest: TestBase() { internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion if (!trySelect(null)) return - block.startCoroutineUndispatched(this) + block.startCoroutineUnintercepted(this) } } diff --git a/gradle/atomicfu-jvm.gradle b/gradle/atomicfu-jvm.gradle index 6077a420be..bd5573eb58 100644 --- a/gradle/atomicfu-jvm.gradle +++ b/gradle/atomicfu-jvm.gradle @@ -21,6 +21,11 @@ jar { } test { - classpath = files(configurations.testRuntime, atomicFU.outputs, sourceSets.test.output.classesDirs, - sourceSets.main.output.resourcesDir) + classpath = files( + configurations.testRuntime, + atomicFU.outputs, + sourceSets.main.output.resourcesDir, + sourceSets.test.output.classesDirs, + sourceSets.test.output.resourcesDir + ) } diff --git a/integration/kotlinx-coroutines-quasar/src/Quasar.kt b/integration/kotlinx-coroutines-quasar/src/Quasar.kt index 73a7f71494..7dd693e01f 100644 --- a/integration/kotlinx-coroutines-quasar/src/Quasar.kt +++ b/integration/kotlinx-coroutines-quasar/src/Quasar.kt @@ -43,7 +43,8 @@ fun runFiberBlocking(block: suspend () -> T): T = private class CoroutineAsync( private val block: suspend () -> T ) : FiberAsync(), Continuation { - override val context: CoroutineContext = Fiber.currentFiber().scheduler.executor.asCoroutineDispatcher() + override val context: CoroutineContext = + newCoroutineContext(Fiber.currentFiber().scheduler.executor.asCoroutineDispatcher()) override fun resume(value: T) { asyncCompleted(value) } override fun resumeWithException(exception: Throwable) { asyncFailed(exception) } diff --git a/js/kotlinx-coroutines-core-js/test/TestBase.kt b/js/kotlinx-coroutines-core-js/test/TestBase.kt index 061b1d7c8a..8db40dbd8e 100644 --- a/js/kotlinx-coroutines-core-js/test/TestBase.kt +++ b/js/kotlinx-coroutines-core-js/test/TestBase.kt @@ -27,6 +27,12 @@ public actual open class TestBase actual constructor() { throw exception } + private fun printError(message: String, cause: Throwable) { + if (error == null) error = cause + println("$message: $cause") + console.log(cause) + } + /** * Asserts that this invocation is `index`-th in the execution sequence (counting from one). */ @@ -69,10 +75,12 @@ public actual open class TestBase actual constructor() { return promise(block = block, context = CoroutineExceptionHandler { context, e -> if (e is CancellationException) return@CoroutineExceptionHandler // are ignored exCount++ - if (exCount > unhandled.size) - error("Too many unhandled exceptions $exCount, expected ${unhandled.size}", e) - if (!unhandled[exCount - 1](e)) - error("Unhandled exception was unexpected", e) + when { + exCount > unhandled.size -> + printError("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e) + !unhandled[exCount - 1](e) -> + printError("Unhandled exception was unexpected: $e", e) + } context[Job]?.cancel(e) }).catch { e -> ex = e diff --git a/native/kotlinx-coroutines-core-native/test/TestBase.kt b/native/kotlinx-coroutines-core-native/test/TestBase.kt index f2873524f8..7f75e4439e 100644 --- a/native/kotlinx-coroutines-core-native/test/TestBase.kt +++ b/native/kotlinx-coroutines-core-native/test/TestBase.kt @@ -23,6 +23,11 @@ public actual open class TestBase actual constructor() { throw exception } + private fun printError(message: String, cause: Throwable) { + if (error == null) error = cause + println("$message: $cause") + } + /** * Asserts that this invocation is `index`-th in the execution sequence (counting from one). */ @@ -65,10 +70,12 @@ public actual open class TestBase actual constructor() { runBlocking(block = block, context = CoroutineExceptionHandler { context, e -> if (e is CancellationException) return@CoroutineExceptionHandler // are ignored exCount++ - if (exCount > unhandled.size) - error("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e) - if (!unhandled[exCount - 1](e)) - error("Unhandled exception was unexpected: $e", e) + when { + exCount > unhandled.size -> + printError("Too many unhandled exceptions $exCount, expected ${unhandled.size}, got: $e", e) + !unhandled[exCount - 1](e) -> + printError("Unhandled exception was unexpected: $e", e) + } context[Job]?.cancel(e) }) } catch (e: Throwable) {