Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor concurrent uploading of local changes #18449

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
531c4ae
Refactor: rename context to appContext and rm comment
ovitrif May 17, 2023
7b0eafd
Refactor: resolve warnings
ovitrif May 17, 2023
0f85412
Test: add test for `queueUploadFromAllSites`
ovitrif May 17, 2023
5c0c2ac
Update: refactor upload starter to continue on cancellation
ovitrif May 17, 2023
59219d7
Update: refactor try-catch block to runCatching
ovitrif May 17, 2023
12d4421
Update: rename test for multiple uploads
ovitrif May 17, 2023
eccad75
Update: adapt comment docs for `checkConnectionAndUpload`
ovitrif May 17, 2023
f1c2f51
Add release notes for mutex bugfix
ovitrif May 17, 2023
e3bcf48
Cleanup comment for UploadStarterConcurrentTest class
ovitrif May 22, 2023
ecd44b6
Update: log isPage bool as int for improved readability
ovitrif May 22, 2023
89481d1
Update: refactor fixtures to check drafts upload order
ovitrif May 22, 2023
5b6a02a
Refactor: cleanup redundant test setup code
ovitrif May 25, 2023
0e96e11
Refactor: extract UploadFixtures to dry test code
ovitrif May 25, 2023
caf73d4
Update: improve logs during upload of posts & pages
ovitrif May 25, 2023
90dccd2
Update: provide Mutex via DI for testability
ovitrif May 25, 2023
c06843a
Refactor: cleanup code of concurrent test
ovitrif May 25, 2023
21c1160
Update: fix post id counter in test
ovitrif May 25, 2023
4c9f4dd
Update: fix refs in comment doc of UploadStarter
ovitrif May 25, 2023
3e95c6f
Add: unit test simulating the crash in resume onCancellation handler
ovitrif May 25, 2023
6874e8f
Update: refactor test setup code to allow more flexibility
ovitrif Jun 2, 2023
a3bf1b6
Update: rewrite mutex unlock test
ovitrif Jun 5, 2023
cf16fd2
Refactor: cleanup mutex test & reactivate
ovitrif Jun 13, 2023
de1e92c
Refactor: group mutex test blocks
ovitrif Jun 14, 2023
fdcedc6
Refactor: remove unneeded advanceUntilIdle
ovitrif Jun 14, 2023
c5f4379
Refactor: more cleanup in mutex test
ovitrif Jun 14, 2023
cc5ac60
Refactor: make trackAutoUploadAction fun foldable
ovitrif Jun 14, 2023
8157e1c
Refactor: use run to avoid unnecessary var in mutex test
ovitrif Jun 14, 2023
10df537
Update: remove page from tests
ovitrif Jun 15, 2023
ae858f6
Update: add asserts to test
ovitrif Jun 15, 2023
9f0a4c0
Refactor: make test fail because of exception
ovitrif Jun 15, 2023
c3ab67a
Fix: wrap mutex lock/unlock in try/catch block
ovitrif Jun 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE-NOTES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [*] Block editor: [List block] Fix an issue when merging a list item into a Paragraph would remove its nested list items [https://github.com/wordpress-mobile/gutenberg-mobile/pull/5785]
* [*] Block editor: [Gallery block] Fixes a compatibility issue with Gallery block [https://github.com/wordpress-mobile/WordPress-Android/pull/18519]
* [**] Block editor: Tapping any type of nested block moves focus to the nested block directly, rather than requiring multiple taps to navigate down each nesting levels. [https://github.com/wordpress-mobile/gutenberg-mobile/pull/5781]
* [*] Fixed an error when syncing offline data to the cloud. [https://github.com/wordpress-mobile/WordPress-Android/pull/18449]

22.4
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import dagger.hilt.components.SingletonComponent
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.sync.Mutex
import org.wordpress.android.util.helpers.Debouncer
import javax.inject.Named
import javax.inject.Singleton

const val APPLICATION_SCOPE = "APPLICATION_SCOPE"

Expand Down Expand Up @@ -53,4 +55,10 @@ class ThreadModule {
fun provideDebouncer(): Debouncer {
return Debouncer()
}

@Provides
@Singleton
fun provideMutex(): Mutex {
return Mutex()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import android.content.Context
import androidx.lifecycle.DefaultLifecycleObserver
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.LiveData
import androidx.lifecycle.Observer
import androidx.lifecycle.ProcessLifecycleOwner
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -18,6 +16,7 @@ import kotlinx.coroutines.sync.withLock
import org.wordpress.android.analytics.AnalyticsTracker.Stat
import org.wordpress.android.fluxc.Dispatcher
import org.wordpress.android.fluxc.generated.UploadActionBuilder
import org.wordpress.android.fluxc.model.PostModel
import org.wordpress.android.fluxc.model.SiteModel
import org.wordpress.android.fluxc.store.PageStore
import org.wordpress.android.fluxc.store.PostStore
Expand Down Expand Up @@ -45,14 +44,15 @@ import kotlin.coroutines.CoroutineContext
* addition to this, call sites can also request an immediate execution by calling [checkConnectionAndUpload].
*
* The method [activateAutoUploading] must be called once, preferably during app creation, for the auto-uploads to work.
*
* @param mutex When the app comes to foreground both [queueUploadFromAllSites] and [queueUploadFromSite] are invoked.
* The problem is that they can run in parallel and `uploadServiceFacade.isPostUploadingOrQueued(it)` might return
* out-of-date result and a same post is added twice.
*/
@Singleton
@OpenForTesting
class UploadStarter @Inject constructor(
/**
* The Application context
*/
private val context: Context,
private val appContext: Context,
private val dispatcher: Dispatcher,
private val postStore: PostStore,
private val pageStore: PageStore,
Expand All @@ -63,17 +63,11 @@ class UploadStarter @Inject constructor(
@Named(IO_THREAD) private val ioDispatcher: CoroutineDispatcher,
private val uploadServiceFacade: UploadServiceFacade,
private val networkUtilsWrapper: NetworkUtilsWrapper,
private val connectionStatus: LiveData<ConnectionStatus>
private val connectionStatus: LiveData<ConnectionStatus>,
private val mutex: Mutex,
) : CoroutineScope {
private val job = Job()

/**
* When the app comes to foreground both `queueUploadFromAllSites` and `queueUploadFromSite` are invoked.
* The problem is that they can run in parallel and `uploadServiceFacade.isPostUploadingOrQueued(it)` might return
* out-of-date result and a same post is added twice.
*/
private val mutex = Mutex()

override val coroutineContext: CoroutineContext get() = job + bgDispatcher

/**
Expand All @@ -97,48 +91,34 @@ class UploadStarter @Inject constructor(
fun activateAutoUploading(processLifecycleOwner: ProcessLifecycleOwner) {
// We're skipping the first emitted value because the processLifecycleObserver below will also trigger an
// immediate upload.
connectionStatus.skip(1).observe(processLifecycleOwner, Observer {
queueUploadFromAllSites()
})
connectionStatus.skip(1).observe(processLifecycleOwner) { queueUploadFromAllSites() }

processLifecycleOwner.lifecycle.addObserver(processLifecycleObserver)
}

fun queueUploadFromAllSites() = launch {
val sites = siteStore.sites
try {
checkConnectionAndUpload(sites = sites)
} catch (e: Exception) {
AppLog.e(T.MEDIA, e)
}
}
fun queueUploadFromAllSites() = launch { checkConnectionAndUpload(sites = siteStore.sites) }

/**
* Upload all local drafts from the given [site].
*/
fun queueUploadFromSite(site: SiteModel) = launch {
try {
checkConnectionAndUpload(sites = listOf(site))
} catch (e: Exception) {
AppLog.e(T.MEDIA, e)
}
}
fun queueUploadFromSite(site: SiteModel) = launch { checkConnectionAndUpload(sites = listOf(site)) }

/**
* If there is an internet connection, uploads all posts with local changes belonging to [sites].
*
* This coroutine will suspend until all the [upload] operations have completed. If one of them fails, all query
* and queuing attempts ([upload]) will be canceled. The exception will be thrown by this method.
* and queuing attempts ([upload]) will continue. The last exception will be thrown by this method.
*/
private suspend fun checkConnectionAndUpload(sites: List<SiteModel>) = coroutineScope {
if (!networkUtilsWrapper.isNetworkAvailable()) {
return@coroutineScope
}

sites.forEach {
launch(ioDispatcher) {
upload(site = it)
if (!networkUtilsWrapper.isNetworkAvailable()) return@coroutineScope
try {
sites.forEach {
launch(ioDispatcher) {
upload(site = it)
}
}
} catch (e: Exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure this will cause the other coroutines (for other sites) to not be canceled since this try/catch is around the coroutine launches (which return instantly) and not around the actual code that throws the exception (call to update). Also, this function is creating a new coroutineScope which does not use a SupervisorJob (line 114) so cancellations will be propagated to sibling coroutine jobs if any job fails.

Also, even though we are talking about Coroutine cancellation, this doesn't mean that code will actually stop running immediately because coroutine cancellation is cooperative.

To be honest, I couldn't quite understand what exactly this class is trying to accomplish in terms of job execution order, parallel work, and structured concurrency, so it is a bit hard to understand if the proposed fixes here and previous PR would work as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, and I think this actually might shed light on the original issue. I see that this same possibility exists in an earlier version of this implementation, and I believe that the underlying "double-unlock" issue could be a result of this.

One theory / possibility (after reading the backscroll, and this comment) is that the forEach is spawning multiple jobs on the ioDispatcher (with a thread pool), most of them blocking on the lock() invocation. Then, if any of the child jobs throws, the parent job cancels, and all siblings too, and then things get a little bit tricky, imo. Since lock() docs say:

**This function releases the lock if it was already acquired by this function before the CancellationException was thrown.

and

Note that this function does not check for cancellation when it is not suspended.

So, although each site's uploads should be blocked and served in FIFO manner, I wonder if the cancellation bubbling up to the parent job and then back down allows another child job to acquire the lock before it is cancelled as a sibling 🤔

Though there is a "prompt cancellation guarantee", does this guarantee that it cancels immediately when a sibling cancels? (Since those children can be on different threads, along with the fact that we catch and re-throw, it makes me wonder).

I have not tested this hypothesis, so please take it with a grain of salt. I just wanted to suggest it as I've skimmed the back scroll, and your comment seems that it might shed some light on the issue.

Copy link
Contributor

@thomashorta thomashorta May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was actually trying to reproduce it with some custom code (though I was running it as a Unit test on the JVM and not on an actual device, so I believe thread implementation might be different).

Because of the mutex, no upload jobs will be running at the same time, though their coroutines would've been started and they're all suspended at the lock function.

I wonder if the cancellation bubbling up to the parent job and then back down allows another child job to acquire the lock before it is cancelled as a sibling 🤔

In my tests, I see this definitely happens, though it doesn't seem to cause of the crash because in that scenario the unlock call throwing an exception would be either this or this, but not the one causing the Sentry crash, which is this one.

Since the crash happens, I am guessing is also possible that timing can make that be called right when one of the jobs released the lock AND while the cancellation is reaching the internal continuation in suspendCancellableCoroutineReusable from the Mutex.

This is a tricky crash, and I still think it could happen even using just the withLock and not calling unlock on our side, though that extra unlock on our side definitely doesn't help, so maybe @ovitrif changes here help solve the issue.

I feel that updating to coroutines to 1.7.0 will definitely solve this crash (since the Mutex internals were completely changed) though not sure what would take to make that dependency update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just adding more information about this part here:

To be honest, I couldn't quite understand what exactly this class is trying to accomplish in terms of job execution order, parallel work, and structured concurrency, so it is a bit hard to understand if the proposed fixes here and previous PR would work as expected.

What I mean is: I don't understand why we are starting multiple parallel jobs but suspending them all on the same mutex. In this scenario it feels to me the mutex (and concurrency) is not needed since in practice we are just running each job sequentially.

It looks like all those jobs could be called from 1 coroutine, with some exception handling, instead of relying on concurrent jobs and a syncing mechanism (mutex in this case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the crash happens, I am guessing is also possible that timing can make that be called right when one of the jobs released the lock AND while the cancellation is reaching the internal continuation in suspendCancellableCoroutineReusable from the Mutex.

Yeah, this is exactly what I was thinking might be happening (and thanks for linking to the various sources where unlock is called). It is possible that the test dispatcher exhibits different behavior from the ioDispatcher actually used in production. If it is indeed a timing issue / race condition of some sort, this might explain why it is hard to reproduce in the test environment.

This is a tricky crash, and I still think it could happen even using just the withLock

I also agree, since, as @ovitrif pointed out, it is basically doing the same thing we were doing (i.e. calling unlock in a finally).

In this scenario it feels to me the mutex (and concurrency) is not needed since in practice we are just running each job sequentially.

It looks like all those jobs could be called from 1 coroutine, with some exception handling, instead of relying on concurrent jobs and a syncing mechanism (mutex in this case).

I agree with this as well, especially since the jobs are blocked at the highest level, before any control flow traverses into the more granular layers of the implementation where service boundaries might offer concurrency advantages (disk vs. network performance characteristics).

Also, considering performance from a high level, if we hypothetically implemented more granular concurrency, I have doubts that such advantages would be significant anyway, since the likely bottleneck would be the device's network bandwidth (in most circumstances) - so uploading to multiple sites simultaneously likely won't be much (if at all) faster than uploading to each one sequentially, and incurs this complexity cost, as well as an increased risk of multiple interrupted transfers instead of just one in the case of network failure.

That said, currently, it seems that the purpose of the mutex is described in the code comment:

    /**
     * When the app comes to foreground both `queueUploadFromAllSites` and `queueUploadFromSite` are invoked.
     * The problem is that they can run in parallel and `uploadServiceFacade.isPostUploadingOrQueued(it)` might return
     * out-of-date result and a same post is added twice.
     */

so maybe more work needs to be done to make those queueUploadFrom{AllSites,Site} methods idempotent? I don't have enough context to know what problem this mutex solves here. Perhaps the "deduping" should be happening at the layer closer to the queue, with any needed synchronization there? @ParaskP7 👋 😄 I see that this code comment was from 43949f1 but it appears originate a bit earlier: (#10878) - so I'm not sure if you have full context of the original reason for that comment. But I wonder wdyt about this? I.e. do you think it is possible this synchronization / deduping could be done in the service somehow?

Copy link
Contributor Author

@ovitrif ovitrif May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the input @thomashorta and @mkevins 🙇🏻 , you're awesome and really helpful 🙏🏻

so maybe more work needs to be done to make those queueUploadFrom{AllSites,Site} methods idempotent? I don't have enough context to know what problem this mutex solves here

If I can think outside the box on this one, this is also my view. That mutex should be replaced with a different solution that avoids the post duplication 👍🏻 . To me, locking an async job to enforce synchronicity (basically manually locking and unlocking mutex to force sequential batching) is more like a workaround than an actual solution, therefore we should not take the mutex as a given for solving the underlying issue it attempted to solve. This would automatically fix the crash we are seeing in Sentry.

AppLog.e(T.MEDIA, e)
}
}

Expand All @@ -148,51 +128,31 @@ class UploadStarter @Inject constructor(
private suspend fun upload(site: SiteModel) = coroutineScope {
try {
mutex.withLock {
val posts = async { postStore.getPostsWithLocalChanges(site) }
val pages = async { pageStore.getPagesWithLocalChanges(site) }
val posts = async { postStore.getPostsWithLocalChanges(site).orEmpty() }
val pages = async { (pageStore.getPagesWithLocalChanges(site) as? List<PostModel>).orEmpty() }
val list = posts.await() + pages.await()

list.asSequence()
.map { post ->
val action = uploadActionUseCase.getAutoUploadAction(post, site)
Pair(post, action)
}
.filter { (_, action) ->
action != DO_NOTHING
}
.filter { (_, action) -> action != DO_NOTHING }
.toList()
.forEach { (post, action) ->
trackAutoUploadAction(action, post.status, post.isPage)
AppLog.d(
AppLog.T.POSTS,
"UploadStarter for post (isPage: ${post.isPage}) title: ${post.title}, action: $action"
)
dispatcher.dispatch(
UploadActionBuilder.newIncrementNumberOfAutoUploadAttemptsAction(
post
)
)
uploadServiceFacade.uploadPost(
context = context,
post = post,
trackAnalytics = false
)
AppLog.d(T.POSTS, "UploadStarter for ${post.toStringLog()}; action: $action")
dispatcher.dispatch(UploadActionBuilder.newIncrementNumberOfAutoUploadAttemptsAction(post))
uploadServiceFacade.uploadPost(appContext, post, trackAnalytics = false)
}
}
} catch (e: CancellationException) {
AppLog.e(T.MEDIA, e)
// Do any needed actions while we are still holding the mutex lock, then release it and rethrow the
// exception so it can be handled upstream
mutex.unlock()
throw e
} catch (e: Exception) {
AppLog.e(T.POSTS, e)
}
}

private fun trackAutoUploadAction(
action: UploadAction,
status: String,
isPage: Boolean
) {
private fun PostModel.toStringLog() = "${if (isPage) "page" else "post"} with title: $title"

private fun trackAutoUploadAction(action: UploadAction, status: String, isPage: Boolean) {
tracker.track(
if (isPage) Stat.AUTO_UPLOAD_PAGE_INVOKED else Stat.AUTO_UPLOAD_POST_INVOKED,
mapOf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.wordpress.android.ui.uploads

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.MutableLiveData
import androidx.lifecycle.ProcessLifecycleOwner
import org.mockito.kotlin.any
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.wordpress.android.fluxc.model.PostModel
import org.wordpress.android.fluxc.model.SiteModel
import org.wordpress.android.fluxc.model.post.PostStatus
import org.wordpress.android.fluxc.store.UploadStore
import org.wordpress.android.ui.posts.PostUtilsWrapper
import org.wordpress.android.util.DateTimeUtils
import org.wordpress.android.util.NetworkUtilsWrapper
import org.wordpress.android.viewmodel.helpers.ConnectionStatus
import java.util.Date

internal object UploadFixtures {
private var postIdIndex = 0

private fun makePostTitleFromId() = postIdIndex.toString().padStart(2, '0')

fun resetTestPostIdIndex() { postIdIndex = 0 }

fun createMockedNetworkUtilsWrapper() = mock<NetworkUtilsWrapper> { on { isNetworkAvailable() } doReturn true }

fun createConnectionStatusLiveData(initialValue: ConnectionStatus?): MutableLiveData<ConnectionStatus> {
return MutableLiveData<ConnectionStatus>().apply { value = initialValue }
}

fun createMockedPostUtilsWrapper() = mock<PostUtilsWrapper> {
on { isPublishable(any()) } doReturn true
on { isPostInConflictWithRemote(any()) } doReturn false
}

fun createMockedUploadStore(numberOfAutoUploadAttempts: Int) = mock<UploadStore> {
on { getNumberOfPostAutoUploadAttempts(any()) } doReturn numberOfAutoUploadAttempts
}

fun createMockedUploadServiceFacade() = mock<UploadServiceFacade> {
on { isPostUploadingOrQueued(any()) } doReturn false
}

fun createMockedProcessLifecycleOwner(lifecycle: Lifecycle = mock()) = mock<ProcessLifecycleOwner> {
on { this.lifecycle } doReturn lifecycle
}

fun createLocallyChangedPostModel(postStatus: PostStatus = PostStatus.DRAFT, page: Boolean = false) =
PostModel().apply {
setId(++postIdIndex)
setTitle(makePostTitleFromId())
setStatus(postStatus.toString())
setIsLocallyChanged(true)
setDateLocallyChanged(DateTimeUtils.iso8601FromTimestamp(Date().time / 1000))
setIsPage(page)
}

fun createSiteModel(isWpCom: Boolean = true) = SiteModel().apply { setIsWPCom(isWpCom) }
}
Loading