Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work with ThreadLocal-sensitive Components #119

Closed
ian4hu opened this issue Sep 13, 2017 · 10 comments
Closed

Work with ThreadLocal-sensitive Components #119

ian4hu opened this issue Sep 13, 2017 · 10 comments

Comments

@ian4hu
Copy link

ian4hu commented Sep 13, 2017

Many application trace framework use ThreadLocal to store the call context of a application, like the SLF4j MDC, CAT , and other distributed trace system(which use thread local for trace the context in some async situation).

But the coroutine will be dispatched to a unspecified thread to execute, how can i make the tracing work?

@elizarov
Copy link
Contributor

elizarov commented Sep 14, 2017

Coroutine's analog to ThreadLocal is CoroutineContext.

To interoperate with ThradLocal-using libraries you need to implement a custom ContinuationInterceptor that supports framework-specific thread-locals. Here is an example. Assume that use some framework that relies on a specific ThreadLocal to store some application-specific data (MyData in this example):

val myThreadLocal = ThreadLocal<MyData>()

To use it with coroutines, you'll need to implement a context that keeps the current value of MyData and puts it the corresponding ThreadLocal every time the coroutine is resumed on a thread. The code should look like this:

class MyContext(
    private var myData: MyData,
    private val dispatcher: CoroutineDispatcher
) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        dispatcher.interceptContinuation(Wrapper(continuation))

    inner class Wrapper<T>(private val continuation: Continuation<T>): Continuation<T> {
        private inline fun wrap(block: () -> Unit) {
            try {
                myThreadLocal.set(myData)
                block()
            } finally {
                myData = myThreadLocal.get()
            }
        }

        override val context: CoroutineContext get() = continuation.context
        override fun resume(value: T) = wrap { continuation.resume(value) }
        override fun resumeWithException(exception: Throwable) = wrap { continuation.resumeWithException(exception) }
    }
}

To use it in your coroutines, you wrap the dispatcher that you want to use with MyContext and give it the initial value of your data. This value will be put into the thread-local on the thread where the coroutine is resumed.

launch(MyContext(MyData(), CommonPool)) {
    // do something...
}

The implementation above would also track any changes to the thread-local that was done and store it in this context, so this way multiple invocation can share "thread-local" data via context.

I do think we need a ready-to-use implementation for SLF4j in the integration directory of kotlinx.coroutines project (kotlinx-coroutines-slf4j). You are welcome to contribute it.

@elizarov
Copy link
Contributor

I've reposted this Q&A to StackOverflow: https://stackoverflow.com/questions/46227462/how-to-use-code-that-relies-on-threadlocal-with-kotlin-coroutines/46227463
Closing this issue.

@elizarov elizarov reopened this Apr 27, 2018
@elizarov
Copy link
Contributor

We should really have it out-of-the box in a separate integration module.

@userquin
Copy link

@elizarov can you take a look at link below?

It is related to Spring Security Context (thread local), but there is an interceptor that cache the coroutine context so I cannot use the approach you mention. Due that the continuation is created in the thread in witch Spring Security Context "exists", I can capture it in the continuation and then apply your wrap solution.

konrad-kaminski/spring-kotlin-coroutine#19

thanks in advance and sorry to link to another repository.

@oshai
Copy link
Contributor

oshai commented Jun 11, 2018

@elizarov - I will try to contribute it.

abendt added a commit to abendt/kotlinx.coroutines that referenced this issue Jun 19, 2018
abendt added a commit to abendt/kotlinx.coroutines that referenced this issue Jun 20, 2018
abendt pushed a commit to abendt/kotlinx.coroutines that referenced this issue Jun 20, 2018
abendt added a commit to abendt/kotlinx.coroutines that referenced this issue Jun 20, 2018
@elizarov
Copy link
Contributor

elizarov commented Jul 25, 2018

The problem with "wrapping dispatcher" approach that I've provided in SO Q&A is that this "wrapped dispatcher" would get lost if dispatcher gets replaced. Consider, the proposed SLF4J MDC context from #403, for example. If you do

launch (MDCContext(DefaultDispatcher)) { 
    foo()
}

But then in foo() you'll need to switch to UI context:

suspend fun foo() {
    withContext(UI) { 
        // MDCContext is lost here.
    }
}

I'm implementing an alternative approach, where you can implement and register a special extension point called CoroutineContextThreadLocal like this:

// declare custom coroutine context element
class MyElement : AbstractCoroutineContextElement(Key) {
    companion object Key : CoroutineContext.Key<MyElement>
    // some state is kept here
}

// declare thread local variable
private val myThreadLocal = ThreadLocal<MyElement?>()

// declare extension point implementation
class MyCoroutineContextThreadLocal : CoroutineContextThreadLocal<MyElement?> {
    // this is invoked before coroutine is resumed on current thread
    override fun updateThreadContext(context: CoroutineContext): MyElement? {
        val oldValue = myThreadLocal.get()
        myThreadLocal.set(context[MyElement])
        return oldValue
    }

    // this is invoked after coroutine has suspended on current thread
    override fun restoreThreadContext(context: CoroutineContext, oldValue: MyElement?) {
        myThreadLocal.set(oldValue)
    }
}

Now, MyCoroutineContextThreadLocal fully qualified class named shall be registered via META-INF/services/kotlinx.coroutines.experimental.CoroutineContextThreadLocal file.

elizarov added a commit that referenced this issue Jul 25, 2018
elizarov pushed a commit that referenced this issue Jul 25, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Jul 25, 2018
elizarov pushed a commit that referenced this issue Jul 25, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Jul 25, 2018
…cal sensitive code

* Debug thread name is redesigned in the style of "thread local"
  where the name of thread reflect the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Jul 25, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Jul 26, 2018
…cal sensitive code

* Debug thread name is redesigned in the style of "thread local"
  where the name of thread reflect the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Jul 26, 2018
See discussion in PR #403 and issue #119
@elizarov
Copy link
Contributor

The downside of the API that is currently proposed in PR #454 is that its overhead for each coroutine suspend/resume is O(N*K), where K is the number of registered CoroutineContextThreadLocal extension points and N is the size of the coroutine context (number of elements in its). I'm thinking about an alternative approach where the overhead is going to be O(N). This will require that each extension point advertises the key of the context element it is bound to and the context is going to be scanned just once on each suspend/resume, only invoking extension points with the corresponding elements in the context.

elizarov added a commit that referenced this issue Aug 20, 2018
…cal sensitive code

* Debug thread name is redesigned in the style of "thread local"
  where the name of thread reflect the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov added a commit that referenced this issue Aug 20, 2018
…cal sensitive code

* Debug thread name is redesigned in the style of "thread local"
  where the name of thread reflect the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 21, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Aug 21, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 21, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Aug 21, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 21, 2018
See discussion in PR #403 and issue #119
@elizarov
Copy link
Contributor

API in PR #454 now looks like this. In your to use it you have to define your own context element that implements ThreadContextElement interface like this:

// declare thread local variable holding MyData
private val myThreadLocal = ThreadLocal<MyData?>()

// declare context element holding MyData
class MyElement(val data: MyData) : ThreadContextElement<MyData?> {
    // declare companion object for a key of this element in coroutine context
    companion object Key : CoroutineContext.Key<MyElement>

    // provide the key of the corresponding context element
    override val key: CoroutineContext.Key<MyElement>
        get() = Key

    // this is invoked before coroutine is resumed on current thread
    override fun updateThreadContext(context: CoroutineContext): MyData? {
        val oldState = myThreadLocal.get()
        myThreadLocal.set(data)
        return oldState
    }

    // this is invoked after coroutine has suspended on current thread
    override fun restoreThreadContext(context: CoroutineContext, oldState: MyData?) {
        myThreadLocal.set(oldState)
    }
}

The implementation is quite efficient now. Its cost is O(N), where N is the size of the context (number of elements in the context). When a coroutine is resumed on a thread its context is scanned for elements that implement ThreadContextElement and they are given change to update the context of the thread.

elizarov added a commit that referenced this issue Aug 22, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 22, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Aug 22, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 22, 2018
See discussion in PR #403 and issue #119
elizarov pushed a commit that referenced this issue Aug 22, 2018
See discussion in PR #403 and issue #119
elizarov pushed a commit that referenced this issue Aug 23, 2018
See discussion in PR #403 and issue #119
elizarov added a commit that referenced this issue Aug 23, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov added a commit that referenced this issue Aug 23, 2018
…sitive code

* Debug thread name is redesigned using ThreadContextElement API
  where the name of thread reflects the name of currently coroutine.
* Intrinsics for startCoroutineUndispatched that correspond to
  CoroutineStart.UNDISPATCHED properly update coroutine context.
* New intrinsics named startCoroutineUnintercepted are introduced.
  They do not update thread context.
* withContext logic is fixed properly update context is various situations.
* DebugThreadNameTest is introduced.
* Reporting of unhandled errors in TestBase is improved.
  Its CoroutineExceptionHandler records but does not rethrow exception.
  This makes sure that failed tests actually fail and do not hang in
  recursive attempt to handle unhandled coroutine exception.

Fixes #119
elizarov pushed a commit that referenced this issue Aug 23, 2018
See discussion in PR #403 and issue #119
elizarov pushed a commit that referenced this issue Aug 23, 2018
See discussion in PR #403 and issue #119
elizarov pushed a commit that referenced this issue Aug 23, 2018
See discussion in PR #403 and issue #119
@stangls
Copy link

stangls commented Nov 5, 2018

For my projects, I used to do write a simple CoroutineLocal implementation to handle these cases.

Let's for the sake of an example say: We want to have block-style db-connections. But only one connection per coroutine. So when we nest the db-connection-blocks we will keep the transaction state and other db state.

object MyDatabaseAccess {

   val connection = CoroutineLocal<Connection>()               // <-- matter of interest

    // block-style db-connection
   suspend fun <T> connected( block: suspend (Connection) -> T ): T {
        var con : Connection? = connection.get()               // <-- matter of interest
        val newConnection = (con==null || con.isClosed)
        if (newConnection) {
            con = TODO( "create connection" )
            connection.set(con)                                // <-- matter of interest
        }
        val ret = block(con!!)
        if (newConnection) {
            con.close()
            connection.set(null)                               // <-- matter of interest
        }
        return ret
   }

}

And the CoroutineLocal implementation is simply a garbage-collected storage for each context.

class CoroutineLocal<T> {

    val values = WeakHashMap<CoroutineContext,T>()

    suspend fun get(): T? =
        values[coroutineContext]

    suspend fun set(value: T?) {
        if (value==null)
            values.remove(coroutineContext)
        else
            values[coroutineContext] = value
    }

}

@varantes
Copy link

varantes commented Feb 8, 2020

Using LazyTraceAsyncTaskExecutor(...).as CoroutineDispatcher as follows:

@Service
@Profile("fake")
class QualityServiceFake(private val beanFactory: BeanFactory) : IQualityService {
        
    private val log = LoggerFactory.getLogger(javaClass)

    override fun rebuild() {
        log.info("Entrou no rebuild")

        val myDispatcher = LazyTraceAsyncTaskExecutor(beanFactory, SimpleAsyncTaskExecutor()).asCoroutineDispatcher()

        runBlocking(myDispatcher + CoroutineName("reb")) {
            log.info("Entrou no runBlocking")
            log.info("Entrou no coroutineScope. Executando em paralelo 3 processos assíncronos")
            repeat(3) {
                launch {
                    log.info("Entrou no async. Executando Thread.sleep de 3 seg")
                    Thread.sleep(3_000)
                    log.info("Retornou do delay")
                }
            }
        }

        log.info("Fim do rebuild")
    }

}

Output:

2020-02-08 15:48:02.847  INFO [biomQltSrv,80acc603d2763267,d726d6eca30fa227,false] 25324 --- [nio-8085-exec-8] b.c.c.b.service.QualityServiceFake       : Entrou no rebuild [@] [method: rebuild] .(QualityServiceFake.kt:57)
2020-02-08 15:48:02.851  INFO [biomQltSrv,80acc603d2763267,9b5a8138ee419e93,false] 25324 --- [cutor-1 @reb#13] b.c.c.b.service.QualityServiceFake       : Entrou no runBlocking [@] [method: invokeSuspend] .(QualityServiceFake.kt:62)
2020-02-08 15:48:02.852  INFO [biomQltSrv,80acc603d2763267,9b5a8138ee419e93,false] 25324 --- [cutor-1 @reb#13] b.c.c.b.service.QualityServiceFake       : Entrou no coroutineScope. Executando em paralelo 3 processos assíncronos [@] [method: invokeSuspend] .(QualityServiceFake.kt:63)
2020-02-08 15:48:02.860  INFO [biomQltSrv,80acc603d2763267,998a482710e90d92,false] 25324 --- [cutor-2 @reb#14] b.c.c.b.service.QualityServiceFake       : Entrou no async. Executando Thread.sleep de 3 seg [@] [method: invokeSuspend] .(QualityServiceFake.kt:66)
2020-02-08 15:48:02.860  INFO [biomQltSrv,80acc603d2763267,980ab6d1c8210385,false] 25324 --- [cutor-3 @reb#15] b.c.c.b.service.QualityServiceFake       : Entrou no async. Executando Thread.sleep de 3 seg [@] [method: invokeSuspend] .(QualityServiceFake.kt:66)
2020-02-08 15:48:02.860  INFO [biomQltSrv,80acc603d2763267,8efcca1801e87b79,false] 25324 --- [cutor-4 @reb#16] b.c.c.b.service.QualityServiceFake       : Entrou no async. Executando Thread.sleep de 3 seg [@] [method: invokeSuspend] .(QualityServiceFake.kt:66)
2020-02-08 15:48:05.861  INFO [biomQltSrv,80acc603d2763267,980ab6d1c8210385,false] 25324 --- [cutor-3 @reb#15] b.c.c.b.service.QualityServiceFake       : Retornou do delay [@] [method: invokeSuspend] .(QualityServiceFake.kt:68)
2020-02-08 15:48:05.861  INFO [biomQltSrv,80acc603d2763267,998a482710e90d92,false] 25324 --- [cutor-2 @reb#14] b.c.c.b.service.QualityServiceFake       : Retornou do delay [@] [method: invokeSuspend] .(QualityServiceFake.kt:68)
2020-02-08 15:48:05.861  INFO [biomQltSrv,80acc603d2763267,8efcca1801e87b79,false] 25324 --- [cutor-4 @reb#16] b.c.c.b.service.QualityServiceFake       : Retornou do delay [@] [method: invokeSuspend] .(QualityServiceFake.kt:68)
2020-02-08 15:48:05.862  INFO [biomQltSrv,80acc603d2763267,d726d6eca30fa227,false] 25324 --- [nio-8085-exec-8] b.c.c.b.service.QualityServiceFake       : Fim do rebuild [@] [method: rebuild] .(QualityServiceFake.kt:73)
```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants