Skip to content

Commit

Permalink
Introduce ThreadContextElement API to integrate with thread-local sen…
Browse files Browse the repository at this point in the history
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
  • Loading branch information
elizarov committed Aug 23, 2018
1 parent be065fd commit 1408d7c
Show file tree
Hide file tree
Showing 21 changed files with 469 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ public final class kotlinx/coroutines/experimental/CoroutineContextKt {
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
public static final fun newCoroutineContext (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;)Lkotlin/coroutines/experimental/CoroutineContext;
public static synthetic fun newCoroutineContext$default (Lkotlin/coroutines/experimental/CoroutineContext;Lkotlinx/coroutines/experimental/Job;ILjava/lang/Object;)Lkotlin/coroutines/experimental/CoroutineContext;
public static final fun restoreThreadContext (Ljava/lang/String;)V
public static final fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/String;
}

public abstract class kotlinx/coroutines/experimental/CoroutineDispatcher : kotlin/coroutines/experimental/AbstractCoroutineContextElement, kotlin/coroutines/experimental/ContinuationInterceptor {
Expand Down Expand Up @@ -436,6 +434,18 @@ public final class kotlinx/coroutines/experimental/ScheduledKt {
public static synthetic fun withTimeoutOrNull$default (JLjava/util/concurrent/TimeUnit;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/experimental/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/experimental/ThreadContextElement : kotlin/coroutines/experimental/CoroutineContext$Element {
public abstract fun restoreThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;Ljava/lang/Object;)V
public abstract fun updateThreadContext (Lkotlin/coroutines/experimental/CoroutineContext;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/experimental/ThreadContextElement$DefaultImpls {
public static fun fold (Lkotlinx/coroutines/experimental/ThreadContextElement;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public static fun get (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext$Key;)Lkotlin/coroutines/experimental/CoroutineContext$Element;
public static fun minusKey (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext$Key;)Lkotlin/coroutines/experimental/CoroutineContext;
public static fun plus (Lkotlinx/coroutines/experimental/ThreadContextElement;Lkotlin/coroutines/experimental/CoroutineContext;)Lkotlin/coroutines/experimental/CoroutineContext;
}

public final class kotlinx/coroutines/experimental/ThreadPoolDispatcher : kotlinx/coroutines/experimental/ExecutorCoroutineDispatcherBase {
public fun close ()V
public fun getExecutor ()Ljava/util/concurrent/Executor;
Expand Down Expand Up @@ -939,6 +949,8 @@ public final class kotlinx/coroutines/experimental/intrinsics/CancellableKt {
public final class kotlinx/coroutines/experimental/intrinsics/UndispatchedKt {
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
public static final fun startCoroutineUndispatched (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/experimental/Continuation;)V
public static final fun startCoroutineUnintercepted (Lkotlin/jvm/functions/Function2;Ljava/lang/Object;Lkotlin/coroutines/experimental/Continuation;)V
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public static final fun startUndispatchedOrReturn (Lkotlinx/coroutines/experimental/AbstractCoroutine;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ configure(subprojects.findAll { !it.name.contains(sourceless) && it.name != "ben
sourceSets {
main.kotlin.srcDirs = ['src']
test.kotlin.srcDirs = ['test']
// todo: do we still need this workaround?
if (!projectName.endsWith("-native")) {
main.resources.srcDirs = ['resources']
test.resources.srcDirs = ['test-resources']
}
}
}
Expand Down
26 changes: 20 additions & 6 deletions common/kotlinx-coroutines-core-common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ public suspend fun <T> withContext(
// fast path #3 if the new dispatcher is the same as the old one.
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val newContinuation = RunContinuationDirect(newContext, uCont)
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
val newContinuation = RunContinuationUndispatched(newContext, uCont)
// There are some other changes in the context, so this thread needs to be updated
withCoroutineContext(newContext) {
return@sc block.startCoroutineUninterceptedOrReturn(newContinuation)
}
}
// slowest path otherwise -- use new interceptor, sync to its result via a full-blown instance of RunCompletion
require(!start.isLazy) { "$start start is not supported" }
Expand All @@ -130,7 +133,6 @@ public suspend fun <T> withContext(
resumeMode = if (start == CoroutineStart.ATOMIC) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE
)
completion.initParentJobInternal(newContext[Job]) // attach to job
@Suppress("DEPRECATION")
start(block, completion)
completion.getResult()
}
Expand Down Expand Up @@ -178,10 +180,22 @@ private class LazyStandaloneCoroutine(
}
}

private class RunContinuationDirect<in T>(
private class RunContinuationUndispatched<in T>(
override val context: CoroutineContext,
continuation: Continuation<T>
) : Continuation<T> by continuation
private val continuation: Continuation<T>
): Continuation<T> {
override fun resume(value: T) {
withCoroutineContext(continuation.context) {
continuation.resume(value)
}
}

override fun resumeWithException(exception: Throwable) {
withCoroutineContext(continuation.context) {
continuation.resumeWithException(exception)
}
}
}

@Suppress("UNCHECKED_CAST")
private class RunCompletion<in T>(
Expand Down
4 changes: 2 additions & 2 deletions common/kotlinx-coroutines-core-common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
// already complete -- select result
if (select.trySelect(null)) {
select.completion.context.checkCompletion() // always check for our completion
block.startCoroutineUndispatched(select.completion)
block.startCoroutineUnintercepted(select.completion)
}
return
}
Expand Down Expand Up @@ -992,7 +992,7 @@ internal open class JobSupport constructor(active: Boolean) : Job, SelectClause0
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.cause)
else
block.startCoroutineUndispatched(state as T, select.completion)
block.startCoroutineUnintercepted(state as T, select.completion)
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions common/kotlinx-coroutines-core-common/src/ResumeMode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
MODE_DIRECT -> resume(value)
MODE_UNDISPATCHED -> resume(value)
MODE_UNDISPATCHED -> withCoroutineContext(context) { resume(value) }
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
Expand All @@ -54,7 +54,7 @@ internal fun <T> Continuation<T>.resumeUninterceptedWithExceptionMode(exception:
MODE_ATOMIC_DEFAULT -> intercepted().resumeWithException(exception)
MODE_CANCELLABLE -> intercepted().resumeCancellableWithException(exception)
MODE_DIRECT -> resumeWithException(exception)
MODE_UNDISPATCHED -> resumeWithException(exception)
MODE_UNDISPATCHED -> withCoroutineContext(context) { resumeWithException(exception) }
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === OFFER_SUCCESS -> {
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw offerResult.sendException
Expand Down Expand Up @@ -753,7 +753,7 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> throw pollResult.receiveException
else -> {
block.startCoroutineUndispatched(pollResult as E, select.completion)
block.startCoroutineUnintercepted(pollResult as E, select.completion)
return
}
}
Expand Down Expand Up @@ -788,14 +788,14 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
pollResult is Closed<*> -> {
if (pollResult.closeCause == null) {
if (select.trySelect(null))
block.startCoroutineUndispatched(null, select.completion)
block.startCoroutineUnintercepted(null, select.completion)
return
} else
throw pollResult.closeCause
}
else -> {
// selected successfully
block.startCoroutineUndispatched(pollResult as E, select.completion)
block.startCoroutineUnintercepted(pollResult as E, select.completion)
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
select.resumeSelectCancellableWithException(it.sendException)
return
}
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
}

@Suppress("DEPRECATION")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,69 @@ import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*

/**
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
* when the code is already in the context of this coroutine.
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
*/
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
val value = try {
public fun <T> (suspend () -> T).startCoroutineUnintercepted(completion: Continuation<T>) {
startDirect(completion) {
startCoroutineUninterceptedOrReturn(completion)
} catch (e: Throwable) {
completion.resumeWithException(e)
return
}
if (value !== COROUTINE_SUSPENDED)
completion.resume(value as T)
}

/**
* Use this function to restart coroutine directly from inside of [suspendCoroutine] in the same context.
* Use this function to restart coroutine directly from inside of [suspendCoroutine],
* when the code is already in the context of this coroutine.
* It does not use [ContinuationInterceptor] and does not update context of the current thread.
*/
public fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) {
startDirect(completion) {
startCoroutineUninterceptedOrReturn(receiver, completion)
}
}

/**
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
* immediately execute coroutine in the current thread until next suspension.
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
*/
public fun <T> (suspend () -> T).startCoroutineUndispatched(completion: Continuation<T>) {
startDirect(completion) {
withCoroutineContext(completion.context) {
startCoroutineUninterceptedOrReturn(completion)
}
}
}

/**
* Use this function to start new coroutine in [CoroutineStart.UNDISPATCHED] mode &mdash;
* immediately execute coroutine in the current thread until next suspension.
* It does not use [ContinuationInterceptor], but updates the context of the current thread for the new coroutine.
*/
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
public fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, completion: Continuation<T>) {
startDirect(completion) {
withCoroutineContext(completion.context) {
startCoroutineUninterceptedOrReturn(receiver, completion)
}
}
}

private inline fun <T> startDirect(completion: Continuation<T>, block: () -> Any?) {
val value = try {
startCoroutineUninterceptedOrReturn(receiver, completion)
block()
} catch (e: Throwable) {
completion.resumeWithException(e)
return
}
if (value !== COROUTINE_SUSPENDED)
if (value !== COROUTINE_SUSPENDED) {
@Suppress("UNCHECKED_CAST")
completion.resume(value as T)
}
}

/**
* Starts this coroutine with the given code [block] in the same context and returns result when it
* completes without suspnesion.
* completes without suspension.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
Expand All @@ -53,7 +84,7 @@ public fun <T> AbstractCoroutine<T>.startUndispatchedOrReturn(block: suspend ()

/**
* Starts this coroutine with the given code [block] in the same context and returns result when it
* completes without suspnesion.
* completes without suspension.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ internal class SelectBuilderImpl<in R>(
override fun onTimeout(time: Long, unit: TimeUnit, block: suspend () -> R) {
if (time <= 0L) {
if (trySelect(null))
block.startCoroutineUndispatched(completion)
block.startCoroutineUnintercepted(completion)
return
}
val action = Runnable {
Expand Down
2 changes: 1 addition & 1 deletion common/kotlinx-coroutines-core-common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
val failure = select.performAtomicTrySelect(TryLockDesc(this, owner))
when {
failure == null -> { // success
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
failure === ALREADY_SELECTED -> return // already selected -- bail out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,6 @@ class SelectArrayChannelTest : TestBase() {
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
if (!trySelect(null)) return
block.startCoroutineUndispatched(this)
block.startCoroutineUnintercepted(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,6 @@ class SelectRendezvousChannelTest : TestBase() {
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
if (!trySelect(null)) return
block.startCoroutineUndispatched(this)
block.startCoroutineUnintercepted(this)
}
}
51 changes: 27 additions & 24 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.experimental

import java.util.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.scheduling.*
import java.util.concurrent.atomic.*
Expand Down Expand Up @@ -98,44 +99,46 @@ public actual fun newCoroutineContext(context: CoroutineContext, parent: Job? =
* Executes a block using a given coroutine context.
*/
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
val oldName = context.updateThreadContext()
val oldValue = updateThreadContext(context)
try {
return block()
} finally {
restoreThreadContext(oldName)
restoreThreadContext(context, oldValue)
}
}

@PublishedApi
internal fun CoroutineContext.updateThreadContext(): String? {
if (!DEBUG) return null
val coroutineId = this[CoroutineId] ?: return null
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
val currentThread = Thread.currentThread()
val oldName = currentThread.name
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
append(oldName)
append(" @")
append(coroutineName)
append('#')
append(coroutineId.id)
}
return oldName
}

internal actual val CoroutineContext.coroutineName: String? get() {
if (!DEBUG) return null
val coroutineId = this[CoroutineId] ?: return null
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
return "$coroutineName#${coroutineId.id}"
}

@PublishedApi
internal fun restoreThreadContext(oldName: String?) {
if (oldName != null) Thread.currentThread().name = oldName
}
private const val DEBUG_THREAD_NAME_SEPARATOR = " @"

private class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) {
internal data class CoroutineId(
val id: Long
) : ThreadContextElement<String>, AbstractCoroutineContextElement(CoroutineId) {
companion object Key : CoroutineContext.Key<CoroutineId>
override fun toString(): String = "CoroutineId($id)"

override fun updateThreadContext(context: CoroutineContext): String {
val coroutineName = context[CoroutineName]?.name ?: "coroutine"
val currentThread = Thread.currentThread()
val oldName = currentThread.name
var lastIndex = oldName.lastIndexOf(DEBUG_THREAD_NAME_SEPARATOR)
if (lastIndex < 0) lastIndex = oldName.length
currentThread.name = buildString(lastIndex + coroutineName.length + 10) {
append(oldName.substring(0, lastIndex))
append(DEBUG_THREAD_NAME_SEPARATOR)
append(coroutineName)
append('#')
append(id)
}
return oldName
}

override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
Thread.currentThread().name = oldState
}
}
Loading

0 comments on commit 1408d7c

Please sign in to comment.