Skip to content

Commit

Permalink
Leak-free transfer via channels, Closeable resources supported as S…
Browse files Browse the repository at this point in the history
…tate and SideEffects (strictly not recommended!)

  * Previous state will be properly closed on change
  * Side effects closed when not delivered

For leak-free transfer details see "Undelivered elements" section in [Channel](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/) documentation. Also see: Kotlin/kotlinx.coroutines#1936

Signed-off-by: Artyom Shendrik <[email protected]>
  • Loading branch information
amal committed Nov 16, 2022
1 parent 2b65ca1 commit ea1d10f
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 36 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
* Full errors handling and behavior control.
* Bootstrap for eager or lazy initialization.
* Side jobs for long-running tasks.
* Leak-free transfer ([1](https://github.com/Kotlin/kotlinx.coroutines/issues/1936), delivery guarantees.
* Strictly not recommended, but `Closeable` resources supported as State and SideEffects
* Previous state will be properly closed on change
* Side effects closed when not delivered
* Well tested.
* Reactive streams compatibility
through [coroutine wrappers](https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive):
Expand Down
17 changes: 13 additions & 4 deletions fluxo-core/api/android/fluxo-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public final class kt/fluxo/core/FluxoSettings {
public abstract class kt/fluxo/core/InputStrategy {
public static final field InBox Lkt/fluxo/core/InputStrategy$InBox;
public fun <init> ()V
public fun createQueue ()Lkotlinx/coroutines/channels/Channel;
public fun createQueue (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun createQueue$default (Lkt/fluxo/core/InputStrategy;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public fun getParallelProcessing ()Z
public fun getRollbackOnCancellation ()Z
public abstract fun processRequests (Lkotlin/jvm/functions/Function2;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -219,9 +220,10 @@ public final class kt/fluxo/core/intercept/FluxoEvent$IntentRejected : kt/fluxo/
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectDropped : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;)V
public final fun getSideEffect ()Ljava/lang/Object;
public final class kt/fluxo/core/intercept/FluxoEvent$IntentUndelivered : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V
public final fun getIntent ()Ljava/lang/Object;
public final fun getResent ()Z
public fun toString ()Ljava/lang/String;
}

Expand All @@ -231,6 +233,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectEmitted : kt/flu
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectUndelivered : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V
public final fun getResent ()Z
public final fun getSideEffect ()Ljava/lang/Object;
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideJobCancelled : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/String;Lkt/fluxo/core/dsl/SideJobScope$RestartState;)V
public final fun getKey ()Ljava/lang/String;
Expand Down
17 changes: 16 additions & 1 deletion fluxo-core/api/jvm/fluxo-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public final class kt/fluxo/core/FluxoSettings {
public abstract class kt/fluxo/core/InputStrategy {
public static final field InBox Lkt/fluxo/core/InputStrategy$InBox;
public fun <init> ()V
public fun createQueue ()Lkotlinx/coroutines/channels/Channel;
public fun createQueue (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun createQueue$default (Lkt/fluxo/core/InputStrategy;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public fun getParallelProcessing ()Z
public fun getRollbackOnCancellation ()Z
public abstract fun processRequests (Lkotlin/jvm/functions/Function2;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -225,6 +226,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$IntentRejected : kt/fluxo/
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$IntentUndelivered : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V
public final fun getIntent ()Ljava/lang/Object;
public final fun getResent ()Z
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectDropped : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;)V
public final fun getSideEffect ()Ljava/lang/Object;
Expand All @@ -237,6 +245,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectEmitted : kt/flu
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectUndelivered : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V
public final fun getResent ()Z
public final fun getSideEffect ()Ljava/lang/Object;
public fun toString ()Ljava/lang/String;
}

public final class kt/fluxo/core/intercept/FluxoEvent$SideJobCancelled : kt/fluxo/core/intercept/FluxoEvent {
public fun <init> (Lkt/fluxo/core/Store;Ljava/lang/String;Lkt/fluxo/core/dsl/SideJobScope$RestartState;)V
public final fun getKey ()Ljava/lang/String;
Expand Down
12 changes: 10 additions & 2 deletions fluxo-core/src/commonMain/kotlin/kt/fluxo/core/InputStrategy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@ import kotlin.jvm.JvmName

public abstract class InputStrategy {

public open fun <Request> createQueue(): Channel<Request> {
return Channel(capacity = Channel.UNLIMITED, onBufferOverflow = BufferOverflow.SUSPEND)
/**
*
* @param onUndeliveredElement See "Undelivered elements" section in [Channel] documentation for details.
*/
public open fun <Request> createQueue(onUndeliveredElement: ((Request) -> Unit)? = null): Channel<Request> {
return Channel(
capacity = Channel.UNLIMITED,
onBufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement = onUndeliveredElement,
)
}

public open val parallelProcessing: Boolean get() = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package kt.fluxo.core.intercept

import kotlinx.coroutines.channels.Channel
import kt.fluxo.core.Store
import kt.fluxo.core.dsl.SideJobScope.RestartState
import kt.fluxo.core.dsl.SideJobScope.RestartState.Restarted
Expand Down Expand Up @@ -77,6 +78,20 @@ public sealed class FluxoEvent<Intent, State, SideEffect : Any>(
override fun toString(): String = "Intent error: $store, $intent (${e.message ?: e})"
}

/**
* When object transferred via [Channel] from one coroutine to another
* it can be lost if either send or receive operation cancelled in transit.
* This event signals about such case for an [intent].
*
* See "Undelivered elements" section in [Channel] documentation for details.
* Also see [GitHub issue](https://github.com/Kotlin/kotlinx.coroutines/issues/1936).
*
* @param resent `true` if [intent] successfully resent to the [Channel] and can be delivered later
*/
class IntentUndelivered<I, S, SE : Any>(store: Store<I, S, SE>, val intent: I, val resent: Boolean) : FluxoEvent<I, S, SE>(store) {
override fun toString(): String = "Intent undelivered: $store, $intent"
}

// endregion

// region SideEffect
Expand All @@ -85,8 +100,22 @@ public sealed class FluxoEvent<Intent, State, SideEffect : Any>(
override fun toString(): String = "SideEffect emitted: $store, $sideEffect"
}

class SideEffectDropped<I, S, SE : Any>(store: Store<I, S, SE>, val sideEffect: SE) : FluxoEvent<I, S, SE>(store) {
override fun toString(): String = "SideEffect dropped: $store, $sideEffect"
/**
* When object transferred via [Channel] from one coroutine to another
* it can be lost if either send or receive operation cancelled in transit.
* This event signals about such case for a [sideEffect].
*
* See "Undelivered elements" section in [Channel] documentation for details.
* Also see [GitHub issue](https://github.com/Kotlin/kotlinx.coroutines/issues/1936).
*
* @param resent `true` if [sideEffect] successfully resent to the [Channel] and can be delivered later
*/
class SideEffectUndelivered<I, S, SE : Any>(
store: Store<I, S, SE>,
val sideEffect: SE,
val resent: Boolean,
) : FluxoEvent<I, S, SE>(store) {
override fun toString(): String = "SideEffect undelivered: $store, $sideEffect"
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.InternalCoroutinesApi
Expand Down Expand Up @@ -63,7 +64,7 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
private val intentContext: CoroutineContext
private val sideJobScope: CoroutineScope
private val interceptorScope: CoroutineScope
private val requestsChannel: Channel<StoreRequest<Intent, State>> = conf.inputStrategy.createQueue()
private val requestsChannel: Channel<StoreRequest<Intent, State>>

private val sideJobsMap = ConcurrentHashMap<String, RunningSideJob<Intent, State, SideEffect>>()

Expand Down Expand Up @@ -96,11 +97,7 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
val parentExceptionHandler = conf.exceptionHandler ?: ctx[CoroutineExceptionHandler]
val exceptionHandler = CoroutineExceptionHandler { context, e ->
events.tryEmit(FluxoEvent.UnhandledError(this, e))
if (parentExceptionHandler != null) {
parentExceptionHandler.handleException(context, e)
} else {
throw e
}
parentExceptionHandler?.handleException(context, e) ?: throw e
}
scope = CoroutineScope(ctx + exceptionHandler + job)
intentContext = scope.coroutineContext + conf.intentContext + when {
Expand All @@ -119,9 +116,44 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
}
}

// Leak-free transfer via channel
// https://github.com/Kotlin/kotlinx.coroutines/issues/1936
// — send operation cancelled before it had a chance to actually send the element.
// — receive operation retrieved the element from the channel but cancelled when trying to return it the caller.
// — channel cancelled, in which case onUndeliveredElement called on every remaining element in the channel's buffer.
// See "Undelivered elements" section in Channel documentation for details.
requestsChannel = conf.inputStrategy.createQueue {
scope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) {
when (it) {
is StoreRequest.HandleIntent -> {
@Suppress("UNINITIALIZED_VARIABLE")
val resent = if (requestsChannel.trySend(it).isSuccess) true else {
it.intent.closeSafely()
false
}
if (isActive) {
events.emit(FluxoEvent.IntentUndelivered(this@FluxoStore, it.intent, resent = resent))
}
}

is StoreRequest.RestoreState -> {
if (isActive) {
updateState(it.state)
}
}
}
}
}
sideEffectChannel = Channel(conf.sideEffectBufferSize, BufferOverflow.SUSPEND) {
scope.launch(Dispatchers.Unconfined) {
events.emit(FluxoEvent.SideEffectDropped(this@FluxoStore, it))
scope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) {
@Suppress("UNINITIALIZED_VARIABLE")
val resent = if (sideEffectChannel.trySend(it).isSuccess) true else {
it.closeSafely()
false
}
if (isActive) {
events.emit(FluxoEvent.SideEffectUndelivered(this@FluxoStore, it, resent = resent))
}
}
}
sideEffectFlow = sideEffectChannel.receiveAsFlow().let {
Expand Down Expand Up @@ -179,7 +211,7 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
override fun send(intent: Intent) {
start()
val i = if (DEBUG || conf.debugChecks) debugIntentWrapper(intent) else intent
scope.launch(Dispatchers.Unconfined) {
scope.launch(Dispatchers.Unconfined, start = CoroutineStart.UNDISPATCHED) {
@Suppress("DeferredResultUnused") sendAsync(i)
}
}
Expand All @@ -200,6 +232,7 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
if (prevValue != nextValue) {
// event fired only if state changed
events.emit(FluxoEvent.StateChanged(this, nextValue))
prevValue.closeSafely()
}
return nextValue
}
Expand All @@ -211,6 +244,28 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
events.emit(FluxoEvent.SideEffectEmitted(this@FluxoStore, sideEffect))
}

private fun handleException(e: Throwable, context: CoroutineContext) {
if (!conf.closeOnExceptions) {
val handler = scope.coroutineContext[CoroutineExceptionHandler]
if (handler != null) {
handler.handleException(context, e)
return
}
}
throw e
}

private suspend fun Any?.closeSafely() {
if (this is Closeable) {
try {
close() // Close if Closeable resource
} catch (e: Throwable) {
handleException(e, currentCoroutineContext())
events.emit(FluxoEvent.UnhandledError(this@FluxoStore, e))
}
}
}

/** Will be called only once for each [FluxoStore] */
private fun launch() = scope.launch {
// observe and process intents
Expand Down Expand Up @@ -256,8 +311,8 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
try {
interceptorScope.start(eventsFlow)
} catch (e: Throwable) {
handleException(e, currentCoroutineContext())
events.emit(FluxoEvent.UnhandledError(this@FluxoStore, e))
conf.exceptionHandler?.handleException(currentCoroutineContext(), e)
}
}
}
Expand Down Expand Up @@ -307,12 +362,8 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
}
events.emit(FluxoEvent.IntentCancelled(this, intent))
} catch (e: Throwable) {
handleException(e, currentCoroutineContext())
events.emit(FluxoEvent.IntentError(this, intent, e))
if (!conf.closeOnExceptions) {
conf.exceptionHandler?.handleException(currentCoroutineContext(), e)
} else {
throw e
}
} finally {
deferred?.complete(Unit)
}
Expand Down Expand Up @@ -371,12 +422,8 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
} catch (_: CancellationException) {
events.emit(FluxoEvent.SideJobCancelled(this@FluxoStore, key, restartState))
} catch (e: Throwable) {
handleException(e, currentCoroutineContext())
events.emit(FluxoEvent.SideJobError(this@FluxoStore, key, restartState, e))
if (!conf.closeOnExceptions) {
conf.exceptionHandler?.handleException(currentCoroutineContext(), e)
} else {
throw e
}
}
}
)
Expand Down Expand Up @@ -404,12 +451,8 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
} catch (_: CancellationException) {
events.emit(FluxoEvent.BootstrapperCancelled(this@FluxoStore, bootstrapper))
} catch (e: Throwable) {
handleException(e, currentCoroutineContext())
events.emit(FluxoEvent.BootstrapperError(this@FluxoStore, bootstrapper, e))
if (!conf.closeOnExceptions) {
conf.exceptionHandler?.handleException(currentCoroutineContext(), e)
} else {
throw e
}
}
}

Expand All @@ -428,6 +471,8 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(

sideEffectChannel.close(ce)

// Interceptor scope shouldn't be closed immediately with scope, when interceptors set.
// It allows to process final events in interceptors.
val interceptorScope = interceptorScope
if (interceptorScope !== scope && interceptorScope.isActive) {
val cancellationCause = cancellationCause
Expand All @@ -436,6 +481,10 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
interceptorScope.cancel(cancellationCause)
}
}

interceptorScope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) {
mutableState.value.closeSafely()
}
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kt.fluxo.core.strategy

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
Expand All @@ -13,8 +14,8 @@ import kt.fluxo.core.dsl.InputStrategyScope
*/
internal object LifoInputStrategy : InputStrategy() {

override fun <Request> createQueue(): Channel<Request> {
return Channel(capacity = Channel.CONFLATED)
override fun <Request> createQueue(onUndeliveredElement: ((Request) -> Unit)?): Channel<Request> {
return Channel(capacity = Channel.CONFLATED, onBufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement = onUndeliveredElement)
}

override suspend fun <Request> (InputStrategyScope<Request>).processRequests(queue: Flow<Request>) {
Expand Down

0 comments on commit ea1d10f

Please sign in to comment.