Skip to content

Commit

Permalink
GuaranteedEffect - Wrapper for a side effect that can guarantee that …
Browse files Browse the repository at this point in the history
…it handled exactly once. Together with `Store` it can also guarantee delivery of the side effect.

Also see:
[Proposal] Primitive or Channel that guarantees the delivery and processing of items
Kotlin/kotlinx.coroutines#2886

Rethink atomicity of certain low-level primitives
Kotlin/kotlinx.coroutines#1813

LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af

Shared flows, broadcast channels
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa

Signed-off-by: Artyom Shendrik <[email protected]>
  • Loading branch information
amal committed Nov 2, 2022
1 parent c81b742 commit 8f7b4d1
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 5 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
* Full errors handling and behavior control.
* Bootstrap for eager or lazy initialization.
* Side jobs for long-running tasks.
* Leak-free transfer ([1](https://github.com/Kotlin/kotlinx.coroutines/issues/1936), delivery guarantees.
* Leak-free transfer, delivery guarantees [[1](https://github.com/Kotlin/kotlinx.coroutines/issues/1936). [2](https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004)].
* Side effects consumption guarantees with `GuaranteedEffect` (effect handled and exactly once) [[1](https://github.com/Kotlin/kotlinx.coroutines/issues/2886), [2](https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150)].
* Strictly not recommended, but `Closeable` resources supported as State and SideEffects
* Previous state will be properly closed on change
* Side effects closed when not delivered
Expand All @@ -38,9 +39,6 @@
- [ ] Subscription Lifecycle
- [ ] Side effects strategies (see `ActionShareBehavior` in FlowMVI)
- [ ] Side effects cache when view not attached
- [ ] Side effects consumption
control ([1](https://proandroiddev.com/how-to-handle-viewmodel-one-time-events-in-jetpack-compose-a01af0678b76#0009)
, [2](https://medium.com/androiddevelopers/viewmodel-one-off-event-antipatterns-16a1da869b95))
- [ ] Complete code coverage with tests
- [ ] SideJobs tests
- [ ] Input strategy tests
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kt.fluxo.core.annotation

/**
* Denotes that any overriding methods should invoke this method as well.
*
*
* Example:
* ```
* @CallSuper
* public abstract void onFocusLost();
* ```
*
* @see androidx.annotation.CallSuper
*/
@MustBeDocumented
@OptionalExpectation
@OptIn(ExperimentalMultiplatform::class)
@Retention(AnnotationRetention.BINARY)
@Suppress("KDocUnresolvedReference")
@Target(
AnnotationTarget.FUNCTION,
AnnotationTarget.PROPERTY_GETTER,
AnnotationTarget.PROPERTY_SETTER
)
public expect annotation class CallSuper()
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
@file:Suppress("MemberVisibilityCanBePrivate", "MaxLineLength")

package kt.fluxo.core.data

import kotlinx.atomicfu.atomic
import kt.fluxo.core.Store
import kt.fluxo.core.annotation.CallSuper
import kt.fluxo.core.annotation.ExperimentalFluxoApi
import kt.fluxo.core.annotation.InternalFluxoApi
import kt.fluxo.core.internal.Closeable
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract

/**
* Wrapper for a [side effect][T] that can guarantee that [content] handled exactly once.
* [handleOrResend] function acknowledges successful processing between a producer and a consumer,
* and resends this side effect through the [Store] otherwise.
*
* Together with [Store] it can also guarantee delivery of the side effect.
*/
public open class GuaranteedEffect<out T : Any>(
/** Raw effect data. Use [content] instead for "exactly once" guarantees! */
@InternalFluxoApi
public val rawContent: T,
) : Closeable {
/*
* Due to the prompt cancellation guarantee changes that landed in Coroutines 1.4,
* Channels and Flows cannot guarantee delivery and even more so, they don't have the API to acknowledge
* successful processing between a producer and consumer to guarantee that an item handled exactly once.
*
* #### References
*
* [Proposal] Primitive or Channel that guarantees the delivery and processing of items
* https://github.com/Kotlin/kotlinx.coroutines/issues/2886
*
* Support leak-free closeable resources transfer via Channel (onUndeliveredElement parameter)
* https://github.com/Kotlin/kotlinx.coroutines/issues/1936
* https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/#undelivered-elements
*
* Rethink atomicity of certain low-level primitives
* https://github.com/Kotlin/kotlinx.coroutines/issues/1813
*
* Shared flows, broadcast channels
* https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
* by Roman Elizarov [Nov 16, 2020]
*
* https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
* https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa
*
* LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
* https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
* https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af
* by Jose Alcérreca [Apr 27, 2018]
*/

private val resendFun = atomic<((sideEffect: Any?) -> Unit)?>(null)

private val hasBeenHandled = atomic(false)


/**
* Returns [raw effect data][T] or `null` if already handled.
*/
public val content: T?
get() = if (hasBeenHandled.compareAndSet(expect = false, update = true)) rawContent else null

/**
* Convenience method that provides "exactly once" handling guarantees.
* 1. Takes [raw effect data][content] or returns if already handled.
* 2. Calls the [handle] function.
* 3. [Clears][close] the effect resources if handled successfully. [Resends][resend] effect otherwise.
*
* You can use all this methods by yourself if this method not suites you well enough.
*
* @param handle function to handle the [raw effect data][content].
* Return `true` if handled successfully, `false` if the effect should be resent.
*
* @return `true` if handled successfully, `false` otherwise.
*/
@ExperimentalFluxoApi
public inline fun handleOrResend(handle: (content: T) -> Boolean): Boolean {
contract {
callsInPlace(handle, InvocationKind.AT_MOST_ONCE)
}
var handled = false
val content = content ?: return false
try {
handled = handle(content)
} finally {
if (!handled) resend() else close()
}
return handled
}

/**
* Resend this effect through the [Store] again and marks it as unhandled.
*/
@ExperimentalFluxoApi
public fun resend() {
val f = checkNotNull(resendFun.value) { "resend is possible only after first publication" }
hasBeenHandled.value = false
f(this)
}

/**
* Clears connection to the [Store] required for [resending][resend] possibility.
*/
@CallSuper
@ExperimentalFluxoApi
public override fun close() {
resendFun.value = null
}


/**
* Called internally to set [connection][block] to the [Store] required for [resending][resend] possibility.
*/
@InternalFluxoApi
internal fun <S> setResendFunction(block: ((sideEffect: S) -> Unit)?) {
@Suppress("UNCHECKED_CAST")
resendFun.value = block as ((Any?) -> Unit)?
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kt.fluxo.core.Bootstrapper
import kt.fluxo.core.IntentHandler
import kt.fluxo.core.Store
import kt.fluxo.core.annotation.InternalFluxoApi
import kt.fluxo.core.data.GuaranteedEffect
import kt.fluxo.core.debug.DEBUG
import kt.fluxo.core.debug.debugIntentWrapper
import kt.fluxo.core.dsl.InputStrategyScope
Expand Down Expand Up @@ -202,9 +203,9 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
override suspend fun sendAsync(intent: Intent): Deferred<Unit> {
start()
val i = if (DEBUG || conf.debugChecks) debugIntentWrapper(intent) else intent
events.emit(FluxoEvent.IntentQueued(this, i))
val deferred = CompletableDeferred<Unit>()
requestsChannel.send(StoreRequest.HandleIntent(deferred, i))
events.emit(FluxoEvent.IntentQueued(this, i))
return deferred
}

Expand Down Expand Up @@ -240,10 +241,19 @@ internal class FluxoStore<Intent, State, SideEffect : Any>(
}

private suspend fun postSideEffect(sideEffect: SideEffect) {
if (sideEffect is GuaranteedEffect<*>) {
sideEffect.setResendFunction(::postSideEffectSync)
}
sideEffectChannel.send(sideEffect)
events.emit(FluxoEvent.SideEffectEmitted(this@FluxoStore, sideEffect))
}

private fun postSideEffectSync(sideEffect: SideEffect) {
scope.launch(Dispatchers.Unconfined, start = CoroutineStart.UNDISPATCHED) {
postSideEffect(sideEffect)
}
}

private fun handleException(e: Throwable, context: CoroutineContext) {
if (!conf.closeOnExceptions) {
val handler = scope.coroutineContext[CoroutineExceptionHandler]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kt.fluxo.data

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.test.TestResult
import kotlinx.coroutines.test.runTest
import kt.fluxo.core.data.GuaranteedEffect
import kt.fluxo.core.store
import kotlin.test.AfterTest
import kotlin.test.Test
import kotlin.test.assertEquals


@ExperimentalCoroutinesApi
class GuaranteedEffectTest {

private val scope = CoroutineScope(Job())

@AfterTest
fun afterTest() {
scope.cancel()
}


@Test
fun guaranteed_effect_can_resend_itself(): TestResult {
val store = scope.store<Unit, Int, GuaranteedEffect<*>>(initialState = 0, handler = {
postSideEffect(GuaranteedEffect(it))
}) {
debugChecks = true
}
return runTest {
var effects = 0
store.send(Unit)

store.sideEffectFlow.takeWhile {
!it.handleOrResend { effects++ != 0 }
}.collect()

assertEquals(2, effects)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kt.fluxo.core.annotation

/**
* Denotes that any overriding methods should invoke this method as well.
*
*
* Example:
* ```
* @CallSuper
* public abstract void onFocusLost();
* ```
*
* @see androidx.annotation.CallSuper
*/
public actual typealias CallSuper = androidx.annotation.CallSuper

0 comments on commit 8f7b4d1

Please sign in to comment.