Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Primitive or Channel that guarantees the delivery and processing of items #2886

Open
manuelvicnt opened this issue Aug 18, 2021 · 13 comments

Comments

@manuelvicnt
Copy link

manuelvicnt commented Aug 18, 2021

Due to the prompt cancellation guarantee changes that landed in Coroutines 1.4, Channels cannot be used as mechanism that guarantees delivery and acknowledges successful processing between producer and consumer in order to guarantee that an item was handled exactly once.

There's a need for another primitive (or improvements to Channel) that also guarantees that the item was successfully processed by the consumer.


Use case: UI Events in Android

In Android development, ViewModel-like classes sometimes need to inform the View that it needs to perform an action. For example, showing a Toast. This is needed, for example, when stateful Views don't expose its internal state—as it happens with most View APIs.

Channels cannot be used as a way to communicate these types of events to the View because those events could be lost under certain circumstances. That is: 1) the producer (ViewModel) sends the event, 2) the consumer (View) receives the event which is scheduled for dispatch, but then, 3) the consumer is cancelled. The event was received but never processed.

The onUndelivered handler support recently added to Channels doesn't work in this case because it messes up with the events ordering.

@elizarov
Copy link
Contributor

An important clarification. It did not become broken in kotlinx.coroutines 1.4 with the introduction of prompt cancellation. It never worked in the first place.

Before we used to have atomic cancellation. So what was happening with UI Events in Android with channels is that one piece of code sends an event to a channel, which is being listened to by some UI. Now, while this event is being dispatched, the corresponding view gets destroyed, but, because of atomic cancellation, the view still receives the event. This typically led to the crash of the Android application when it was trying to show some notification on an already-destroyed view.

After 1.4, with prompt cancellation, the same scenario leads to the event being ignored (lost) instead of crashing the app.

Neither is good, so we need a solution. I'll list a few different things we can do in the follow-up responses.

@elizarov
Copy link
Contributor

The most trivial solution is to rely on the specifics of how view lifecycle works in Android and on Dispatchers.Main.immediate. The solution has a few additional tweaks in addition to the channels:

  1. You send events to the channel from either Dispatcher.Main or from Dispatchers.Main.immediate. That is, instead of simple channel.send(event) you do Dispatchers.Main.immediate { channel.send(event) }.

  2. You receive events from Dispatchers.Main.immediate (sic!). That is, do something like:

    channel.receiveAsFlow()
        .onEach { event -> processEventInUI(event) }
        .launchIn(Dispatchers.Main.immediate) // immediate is the key here!

The idea behind this trick is that views can be destroyed only in between events, but due to the details of Main.immediate the event that was posted from Main will get processed in the same event, so the unhappy path of losing event can never happen.

@elizarov
Copy link
Contributor

Another solution is to introduce a dedicated primitive (not a channel!) that can handle these kinds of "exactly once events" despite cancellation. The idea is that the receiver has to get a permit to receive an event first and, if it is not cancelled, retrieve the event from the underlying buffer. This idea can be directly implemented on top of a synchronized array deque and a semaphore (credit goes to @ndkoval). You can write a trivial implementation of such ExactlyOnceEventBus:

import kotlinx.coroutines.sync.*

class ExactlyOnceEventBus<T> {
    private val buffer = ArrayDeque<T>()
    private val semaphore = Semaphore(Int.MAX_VALUE, Int.MAX_VALUE)

    fun send(event: T) {
        synchronized(buffer) { buffer.add(event) }
        semaphore.release()
    }

    suspend fun receive(): T {
        semaphore.acquire()
        return synchronized(buffer) { buffer.removeFirst() }
    }
}

With this implementation, eventBus.send(event) can be called from any thread, while eventBus.receive() will consume an event only when it was not cancelled.

The implementation is trivial. The challenge is that it is still something that many developers might have to repeat over and over, so it might make sense to provide some ready-to-use primitive with it. Since we expected that people would like to use it as a Flow it looks like providing some kind of a special ExactlyOnceEventFlow (name TBD) in kotlinx.coroutines might be a good idea.

@manuelvicnt
Copy link
Author

Thanks @elizarov ! Something like ExactlyOnceEventBus would be awesome to have as a short-term solution (or even long-term to make Channels simpler). Our team could even help maintaining the code in case you're busy.

We didn't want Flow to be exposed as a type (e.g. ExactlyOnceEventFlow) since that interferes with the backing semantics of the EventBus or Channel. Imagine that a collector receives the event, and then filters it out using the Flow.filter operator, that event would be lost.

Do you think this is something that could go to the next version of coroutines? Thank you!

@elizarov
Copy link
Contributor

We'll need help with the design. If it's not a Flow, then what operators it needs to define? Just having a receive function (as in the trivial implementation above) seems to be very low-level. We'll need some examples of real-life code that is having this exactly-once problem to see what kind of code patterns are used there so that we can design a convenient set of operations.

Nek-12 added a commit to respawn-app/FlowMVI that referenced this issue Jun 7, 2022
amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…it handled exactly once. Together with `Store` it can also guarantee delivery of the side effect.

Also see:
[Proposal] Primitive or Channel that guarantees the delivery and processing of items
Kotlin/kotlinx.coroutines#2886

Rethink atomicity of certain low-level primitives
Kotlin/kotlinx.coroutines#1813

LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af

Shared flows, broadcast channels
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa

Signed-off-by: Artyom Shendrik <[email protected]>
amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…it handled exactly once. Together with `Store` it can also guarantee delivery of the side effect.

Also see:
[Proposal] Primitive or Channel that guarantees the delivery and processing of items
Kotlin/kotlinx.coroutines#2886

Rethink atomicity of certain low-level primitives
Kotlin/kotlinx.coroutines#1813

LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af

Shared flows, broadcast channels
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa

Signed-off-by: Artyom Shendrik <[email protected]>
@PierluigiFimiano
Copy link

Hi,

Just a question, does a ShareFlow with an infinite buffer solve the problem?

@ivanbartsov
Copy link

@PierluigiFimiano I don't think it does.
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/
image

If I read this correctly, this basically means that using a infinitely buffered SharedFlow as a queue of events only works if there's a subscriber at all times. If at any point all subscribers disconnect, the buffer does nothing and the next subscriber will only get the latest replay items -- so delivery guarantee of each produced item is not met.

@PierluigiFimiano
Copy link

Yes you are right, but what about channel with conflated or unlimited capacity? In any case the behaviour is a little bit tricky

@gmk57
Copy link

gmk57 commented Dec 17, 2022

Channels were considered a solution to this problem in the past, but they still may lose events in some cases, as explained in the first posts of this thread. And having more capacity does not help here: channels just don't have the mechanics to reliably know if the event was processed.

@PierluigiFimiano
Copy link

PierluigiFimiano commented Dec 18, 2022

Got it! The channels can be cancelled while processing an event that's the problem! Ok I think that I'll adopt the solution proposed by @elizarov, executing send and receive in the main thread immidiate. Thank you!

@volo-droid
Copy link

volo-droid commented Jul 25, 2023

Imagine that a collector receives the event, and then filters it out using the Flow.filter operator, that event would be lost.

@manuelvicnt what's exactly bad with collector filtering out events it doesn't want to handle?

IMHO Flow looks like a perfect replacement for the code that migrates away from the existing LiveData solutions for handling UI Events. If Android team decides it's too flexible (or error-prone) for the discussed scenario then some simpler androidx interface can be built on top of the proposed ExactlyOnceEventFlow.

@tPl0ch
Copy link

tPl0ch commented Nov 15, 2023

If I read this correctly, this basically means that using a infinitely buffered SharedFlow as a queue of events only works if there's a subscriber at all times. If at any point all subscribers disconnect, the buffer does nothing and the next subscriber will only get the latest replay items -- so delivery guarantee of each produced item is not met.

@ivanbartsov wouldn't it be enough to always attach something like a NullSubscriber, that is always active, but does nothing with the messages, to make sure there is always a subscriber present?

@gmk57
Copy link

gmk57 commented Nov 15, 2023

I guess this won't help: new ("real") subscribers will only get items from replay, not from buffer. Buffer is used for subscribers which are present, but "not ready to accept the new value" (suspended) at the time when it's emitted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants