Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in SharedObservableRequest #35

Merged
merged 6 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DataObservableDelegate<Params : Any, Domain : Any> constructor(
private val sharedNetworkRequest: SharedSingleRequest<Params, Domain> =
SharedSingleRequest { params ->
this.fromNetwork(params)
.doOnSuccess { domain ->
.doAfterSuccess { domain ->
failedNetworkRequests.remove(params)

toMemory(params, domain)
Expand Down
34 changes: 14 additions & 20 deletions dod/src/main/java/com/revolut/rxdata/dod/SharedObservableRequest.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.revolut.rxdata.dod

import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import java.util.*
import io.reactivex.schedulers.Schedulers.io
import java.util.concurrent.ConcurrentHashMap

/*
* Copyright (C) 2019 Revolut
Expand All @@ -21,33 +21,27 @@ import java.util.*
*
*/

internal class SharedObservableRequest<Params, Result>(
private val load: (params: Params) -> Observable<Result>
internal class SharedObservableRequest<Params : Any, Result>(
private val load: (params: Params) -> Observable<Result>,
) {

private val requests = HashMap<Params, Observable<Result>>()
private val requests = ConcurrentHashMap<Params, Observable<Result>>()

fun removeRequest(params: Params) {
requests.remove(params)
}

fun getOrLoad(params: Params): Observable<Result> {
return Observable
.defer {
synchronized(requests) {
requests[params]?.let { cachedShared ->
return@defer cachedShared
}

val newShared = load(params)
.observeOn(Schedulers.io())
.doFinally {
synchronized(requests) { requests.remove(params) }
}
requests.getOrCreate(params) {
load(params)
.observeOn(io())
.doFinally { removeRequest(params) }
.replay(1)
.refCount()

requests[params] = newShared
return@defer newShared
}
}.subscribeOn(Schedulers.io())

}.subscribeOn(io())
}

}
11 changes: 9 additions & 2 deletions dod/src/main/java/com/revolut/rxdata/dod/SharedSingleRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ import io.reactivex.Single
*
*/

internal class SharedSingleRequest<Params, Result : Any>(
internal class SharedSingleRequest<Params: Any, Result : Any>(
private val load: (params: Params) -> Single<Result>
) {

private val sharedObservableRequest: SharedObservableRequest<Params, Result> =
SharedObservableRequest { params ->
load(params).toObservable()
load(params)
.doOnSuccess { removeRequest(params) }
.doOnError { removeRequest(params) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to call removeRequest at dispose as well (and yes, that removal should be more sophisticated and actually check a correspondence of the deleted request to the current one, or enforce somehow At Most Once behaviour for removeRequest)

there is test which checks that especial issue:

    @Test
    fun `GIVEN network not answer before unsubscription of first observe WHEN observe again THEN second observer see network response`() {
        // given
        whenever(fromNetwork.invoke(any())).thenReturn(Single.never())
        storage.remove(params)
        memCache.remove(params)

        val observer1 = dataObservableDelegate.observe(params = params, forceReload = false)
            .skipWhileLoading()
            .test()
        ioScheduler.triggerActions()

        observer1.dispose()
        observer1.assertValues()

        // when
        whenever(fromNetwork.invoke(any())).thenReturn(Single.just(domain))
        val observer2 = dataObservableDelegate.observe(params = params, forceReload = false)
            .skipWhileLoading()
            .test()
        ioScheduler.triggerActions()

        // then
        observer2.assertValues(Data(domain))    // ACTUAL: EMPTY LIST
    }
    ```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do remove request on dispose, it's handled in SharedObservableRequest class via doFinally { removeRequest(params) }. here we only add extra removals on events that are specific to Single lifecycle - i.e. doOnSuccess, which doesn't exist in Observable lifecycle. Actually even doOnError is redundant.

Your test case fails because DoD has internal timeout logic (60s by default ) and it's keeping Single.never() alive during this period. Observer2 basically resubscribes to this request in this case. If you change it slightly to accomodate for timeouts:

        // when
        whenever(fromNetwork.invoke(any())).thenReturn(Single.just(domain))
        ioScheduler.advanceTimeBy(180, TimeUnit.SECONDS)

        val observer2 = dataObservableDelegate.observe(params = params, forceReload = false)
            .skipWhileLoading()
            .test()
        ioScheduler.triggerActions()

then it passes.

It's covered in WHEN unsubscribed from network and data NOT arrives in 60 seconds THEN data not saved so I won't be adding this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

.toObservable()
}

private fun removeRequest(params: Params) {
sharedObservableRequest.removeRequest(params)
}

fun getOrLoad(params: Params): Single<Result> =
sharedObservableRequest.getOrLoad(params).firstOrError()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.revolut.rxdata.dod

import io.reactivex.Single
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.TestScheduler
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
import java.io.IOException

/*
* Copyright (C) 2023 Revolut
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

abstract class BaseDataObservableDelegateTest {

val params: Params = 0
val cachedDomain: Domain = "cached_domain_model"
val backendException = IOException("HTTP 500. All tests are green!")

lateinit var fromNetwork: (Params) -> Single<Domain>

val fromNetworkScoped: DataObservableDelegate<Params, Domain>.(Params) -> Single<Domain> =
{ fromNetwork(it) }

lateinit var toMemory: (Params, Domain) -> Unit
lateinit var fromMemory: (Params) -> Domain
lateinit var toStorage: (Params, Domain) -> Unit
lateinit var fromStorage: (Params) -> Domain
lateinit var dataObservableDelegate: DataObservableDelegate<Params, Domain>

val computationScheduler: TestScheduler = TestScheduler()
val ioScheduler: TestScheduler = TestScheduler()

val memCache = hashMapOf<Params, Domain>()
val storage = hashMapOf<Params, Domain>()

@BeforeEach
fun setUp() {
fromNetwork = mock()
toMemory = mock()
fromMemory = mock()
toStorage = mock()
fromStorage = mock()

dataObservableDelegate = DataObservableDelegate(
fromNetwork = fromNetworkScoped,
fromMemory = fromMemory,
toMemory = toMemory,
fromStorage = fromStorage,
toStorage = toStorage
)

memCache.clear()
storage.clear()

whenever(fromMemory.invoke(any())).thenAnswer { invocation -> memCache[invocation.arguments[0]] }
whenever(toMemory.invoke(any(), any())).thenAnswer { invocation ->
memCache[invocation.arguments[0] as Params] = invocation.arguments[1] as Domain
Unit
}

whenever(fromStorage.invoke(any())).thenAnswer { invocation -> storage[invocation.arguments[0]] }
whenever(toStorage.invoke(any(), any())).thenAnswer { invocation ->
storage[invocation.arguments[0] as Params] = invocation.arguments[1] as Domain
Unit
}

RxJavaPlugins.setIoSchedulerHandler { ioScheduler }
RxJavaPlugins.setComputationSchedulerHandler { computationScheduler }
}

@AfterEach
fun afterEach() {
RxJavaPlugins.reset()
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.revolut.rxdata.dod

import com.revolut.data.model.Data
import io.reactivex.Single
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.kotlin.eq
import org.mockito.kotlin.whenever

/*
* Copyright (C) 2023 Revolut
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

class DataObservableDelegateMuteRecursiveReSubscriptionsTest : BaseDataObservableDelegateTest() {

@ValueSource(booleans = [true, false])
@ParameterizedTest
fun `WHEN dod switchMaps to the same forceReload dod THEN emissions are muted after 2nd iteration`(forceReload: Boolean) {
whenever(fromNetwork.invoke(eq(params))).thenReturn(Single.fromCallable { cachedDomain })
storage[params] = cachedDomain
memCache.remove(params)

val upstreamEmissions = ArrayList<Data<Domain>>()

dataObservableDelegate.observe(params = params, forceReload = forceReload).take(100)
.doOnNext { upstreamEmissions.add(it) }
.switchMap {
dataObservableDelegate.observe(params = params, forceReload = true).take(100)
}
.test()
.apply { ioScheduler.triggerActions() }

assertEquals(
listOf(
Data(null, null, true),
// 1st iteration
Data(cachedDomain, null, true),
Data(cachedDomain, null, false),
// 2nd iteration
Data(cachedDomain, null, true),
Data(cachedDomain, null, false),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was previously asserting a buggy behaviour where last emit here would be the 4th
Data(cachedDomain, null, true).

This wasn't the idea as can be seen in the ReloadingDataScannerTest:

newEvent(Data(null, loading = true), shouldEmit = true)
newEvent(Data("a", loading = true), shouldEmit = true)
newEvent(Data("a", loading = false), shouldEmit = true)

newEvent(Data("a", loading = true), shouldEmit = true)
newEvent(Data("a", loading = false), shouldEmit = true)

The changes in this PR made for avoiding race conditions, apparently fixed that, so I've rewritten this test for better clarity.

// no emits after this point
), upstreamEmissions
)
}

@Test
fun `WHEN dod switchMaps to the same forceReload dod AND fromNetwork returns errors THEN emissions are muted after 2nd iteration`() {
whenever(fromNetwork.invoke(eq(params))).thenReturn(Single.fromCallable { throw backendException })
storage[params] = cachedDomain
memCache.remove(params)

val upstreamEmissions = ArrayList<Data<Domain>>()

dataObservableDelegate.observe(params = params).take(100)
.doOnNext { upstreamEmissions.add(it) }
.switchMap {
dataObservableDelegate.observe(params = params, forceReload = true).take(100)
}
.test()
.apply { ioScheduler.triggerActions() }

assertEquals(
listOf(
Data(null, null, true),
// 1st iteration
Data(cachedDomain, null, true),
Data(cachedDomain, backendException, false),
// 2nd iteration
Data(cachedDomain, null, true),
Data(cachedDomain, backendException, false),
// no emits after this point
), upstreamEmissions
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.revolut.rxdata.dod

import com.revolut.data.model.Data
import io.reactivex.Single
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.kotlin.eq
import org.mockito.kotlin.whenever
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicInteger

/*
* Copyright (C) 2023 Revolut
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

class DataObservableDelegateSharedStorageRequestTest : BaseDataObservableDelegateTest() {

private val domain: Domain = "domain_model"

@Suppress("CheckResult")
@Test
fun `WHEN memoryIsEmpty and dod is observed multiple times THEN fromStorage is called once`() {
val counter = AtomicInteger(0)

dataObservableDelegate = DataObservableDelegate(
fromNetwork = fromNetworkScoped,
fromMemory = fromMemory,
toMemory = toMemory,
fromStorageSingle = {
Single.fromCallable {
counter.incrementAndGet()
Data(cachedDomain)
}.delay(100L, MILLISECONDS, ioScheduler)
},
toStorage = toStorage
)

whenever(fromNetwork.invoke(eq(params))).thenReturn(Single.fromCallable { domain })

dataObservableDelegate.observe(params = params, forceReload = true).test()
dataObservableDelegate.observe(params = params, forceReload = true).test()
dataObservableDelegate.observe(params = params, forceReload = true).test()

ioScheduler.triggerActions()

assertEquals(1, counter.get())
}

}
Loading