Skip to content

Commit

Permalink
Merge pull request #2168 from OneSignal/read-your-write
Browse files Browse the repository at this point in the history
Read-Your-Write Consistency
  • Loading branch information
rgomezp authored Sep 30, 2024
2 parents 7e68c1d + 5873b16 commit 84d0d60
Show file tree
Hide file tree
Showing 25 changed files with 2,130 additions and 1,432 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.onesignal.common.consistency

import com.onesignal.common.consistency.enums.IamFetchRywTokenKey
import com.onesignal.common.consistency.models.ICondition
import com.onesignal.common.consistency.models.IConsistencyKeyEnum

/**
* Used for read your write consistency when fetching In-App Messages.
*
* Params:
* key : String - the index of the RYW token map
*/
class IamFetchReadyCondition(
private val key: String,
) : ICondition {
companion object {
const val ID = "IamFetchReadyCondition"
}

override val id: String
get() = ID

override fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean {
val tokenMap = indexedTokens[key] ?: return false
val userUpdateTokenSet = tokenMap[IamFetchRywTokenKey.USER] != null

/**
* We always update the session count so we know we will have a userUpdateToken. We don't
* necessarily make a subscriptionUpdate call on every session. The following logic
* doesn't consider tokenMap[IamFetchRywTokenKey.SUBSCRIPTION] for this reason. This doesn't
* mean it isn't considered if present when doing the token comparison.
*/
return userUpdateTokenSet
}

override fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String? {
val tokenMap = indexedTokens[key] ?: return null
// maxOrNull compares lexicographically
return listOfNotNull(tokenMap[IamFetchRywTokenKey.USER], tokenMap[IamFetchRywTokenKey.SUBSCRIPTION]).maxOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.onesignal.common.consistency.enums

import com.onesignal.common.consistency.models.IConsistencyKeyEnum

/**
* Each enum is a key that we use to keep track of read-your-write tokens.
* Although the enums are named with "UPDATE", they serve as keys for tokens from both PATCH & POST
*/
enum class IamFetchRywTokenKey : IConsistencyKeyEnum {
USER,
SUBSCRIPTION,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.onesignal.common.consistency.impl

import com.onesignal.common.consistency.models.ICondition
import com.onesignal.common.consistency.models.IConsistencyKeyEnum
import com.onesignal.common.consistency.models.IConsistencyManager
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Manages read-your-write tokens for more accurate segment membership
* calculation. Uses customizable conditions that block retrieval of the newest token until met.
*
* Usage:
* val consistencyManager = ConsistencyManager<MyEnum>()
* val updateConditionDeferred = consistencyManager.registerCondition(MyCustomCondition())
* val rywToken = updateConditionDeferred.await()
*/
class ConsistencyManager : IConsistencyManager {
private val mutex = Mutex()
private val indexedTokens: MutableMap<String, MutableMap<IConsistencyKeyEnum, String>> = mutableMapOf()
private val conditions: MutableList<Pair<ICondition, CompletableDeferred<String?>>> =
mutableListOf()

/**
* Set method to update the token based on the key.
* Params:
* id: String - the index of the token map (e.g. onesignalId)
* key: K - corresponds to the operation for which we have a read-your-write token
* value: String? - the token (read-your-write token)
*/
override suspend fun setRywToken(
id: String,
key: IConsistencyKeyEnum,
value: String,
) {
mutex.withLock {
val rywTokens = indexedTokens.getOrPut(id) { mutableMapOf() }
rywTokens[key] = value
checkConditionsAndComplete()
}
}

/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
*/
override suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?> {
mutex.withLock {
val deferred = CompletableDeferred<String?>()
val pair = Pair(condition, deferred)
conditions.add(pair)
checkConditionsAndComplete()
return deferred
}
}

override suspend fun resolveConditionsWithID(id: String) {
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()

for ((condition, deferred) in conditions) {
if (condition.id == id) {
if (!deferred.isCompleted) {
deferred.complete(null)
}
}
completedConditions.add(Pair(condition, deferred))
}

// Remove completed conditions from the list
conditions.removeAll(completedConditions)
}

/**
* IMPORTANT: calling code should be protected by mutex to avoid potential inconsistencies
*/
private fun checkConditionsAndComplete() {
val completedConditions = mutableListOf<Pair<ICondition, CompletableDeferred<String?>>>()

for ((condition, deferred) in conditions) {
if (condition.isMet(indexedTokens)) {
val newestToken = condition.getNewestToken(indexedTokens)
if (!deferred.isCompleted) {
deferred.complete(newestToken)
}
completedConditions.add(Pair(condition, deferred))
}
}

// Remove completed conditions from the list
conditions.removeAll(completedConditions)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.onesignal.common.consistency.models

interface ICondition {
/**
* Every implementation should define a unique ID & make available via a companion object for
* ease of use
*/
val id: String

/**
* Define a condition that "unblocks" execution
* e.g. we have token (A && B) || A
*/
fun isMet(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String>>): Boolean

/**
* Used to process tokens according to their format & return the newest token.
* e.g. numeric strings would be compared differently from JWT tokens
*/
fun getNewestToken(indexedTokens: Map<String, Map<IConsistencyKeyEnum, String?>>): String?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.onesignal.common.consistency.models

interface IConsistencyKeyEnum
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.onesignal.common.consistency.models

import kotlinx.coroutines.CompletableDeferred

interface IConsistencyManager {
/**
* Set method to update the RYW token based on the key.
* Params:
* id: String - the index of the RYW token map (e.g., onesignalId)
* key: IConsistencyKeyEnum - corresponds to the operation for which we have a read-your-write token
* value: String? - the read-your-write token
*/
suspend fun setRywToken(
id: String,
key: IConsistencyKeyEnum,
value: String,
)

/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
* Params:
* condition: ICondition - the condition to be registered
* Returns: CompletableDeferred<String?> - a deferred action that completes when the condition is met
*/
suspend fun registerCondition(condition: ICondition): CompletableDeferred<String?>

/**
* Resolve all conditions with a specific ID
*/
suspend fun resolveConditionsWithID(id: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ internal class HttpClient(
}
}

if (headers?.rywToken != null) {
con.setRequestProperty("OneSignal-RYW-Token", headers.rywToken.toString())
}

if (headers?.retryCount != null) {
con.setRequestProperty("Onesignal-Retry-Count", headers.retryCount.toString())
}

if (headers?.sessionDuration != null) {
con.setRequestProperty("OneSignal-Session-Duration", headers.sessionDuration.toString())
}

// Network request is made from getResponseCode()
httpResponse = con.responseCode

Expand Down Expand Up @@ -299,9 +311,9 @@ internal class HttpClient(
* Reads the HTTP Retry-Limit from the response.
*/
private fun retryLimitFromResponse(con: HttpURLConnection): Int? {
val retryLimitStr = con.getHeaderField("Retry-Limit")
val retryLimitStr = con.getHeaderField("OneSignal-Retry-Limit")
return if (retryLimitStr != null) {
Logging.debug("HttpClient: Response Retry-After: $retryLimitStr")
Logging.debug("HttpClient: Response OneSignal-Retry-Limit: $retryLimitStr")
retryLimitStr.toIntOrNull()
} else {
null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
package com.onesignal.core.internal.http.impl

data class OptionalHeaders(
/**
* Used as an E-Tag
*/
val cacheKey: String? = null,
/**
* Used for read your write consistency
*/
val rywToken: String? = null,
/**
* Current retry count
*/
val retryCount: Int? = null,
/**
* Used to track delay between session start and request
*/
val sessionDuration: Long? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class SessionListener(
}

override fun onSessionStarted() {
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId))
_operationRepo.enqueue(TrackSessionStartOperation(_configModelStore.model.appId, _identityModelStore.model.onesignalId), true)
}

override fun onSessionActive() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.onesignal.user

import com.onesignal.common.consistency.impl.ConsistencyManager
import com.onesignal.common.consistency.models.IConsistencyManager
import com.onesignal.common.modules.IModule
import com.onesignal.common.services.ServiceBuilder
import com.onesignal.core.internal.operations.IOperationExecutor
Expand Down Expand Up @@ -34,6 +36,9 @@ import com.onesignal.user.internal.subscriptions.impl.SubscriptionManager

internal class UserModule : IModule {
override fun register(builder: ServiceBuilder) {
// Consistency
builder.register<ConsistencyManager>().provides<IConsistencyManager>()

// Properties
builder.register<PropertiesModelStore>().provides<PropertiesModelStore>()
builder.register<PropertiesModelStoreListener>().provides<IBootstrapService>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface ISubscriptionBackendService {
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): String?
): Pair<String, String?>?

/**
* Update an existing subscription with the properties provided.
Expand All @@ -34,7 +34,7 @@ interface ISubscriptionBackendService {
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
)
): String?

/**
* Delete an existing subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ interface IUserBackendService {
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
)
): String?

/**
* Retrieve a user from the backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class SubscriptionBackendService(
aliasLabel: String,
aliasValue: String,
subscription: SubscriptionObject,
): String? {
): Pair<String, String?>? {
val jsonSubscription = JSONConverter.convertToJSON(subscription)
jsonSubscription.remove("id")
val requestJSON = JSONObject().put("subscription", jsonSubscription)
Expand All @@ -33,14 +33,19 @@ internal class SubscriptionBackendService(
return null
}

return subscriptionJSON.getString("id")
var rywToken: String? = null
if (responseJSON.has("ryw_token")) {
rywToken = responseJSON.getString("ryw_token")
}

return Pair(subscriptionJSON.getString("id"), rywToken)
}

override suspend fun updateSubscription(
appId: String,
subscriptionId: String,
subscription: SubscriptionObject,
) {
): String? {
val requestJSON =
JSONObject()
.put("subscription", JSONConverter.convertToJSON(subscription))
Expand All @@ -50,6 +55,13 @@ internal class SubscriptionBackendService(
if (!response.isSuccess) {
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
} else {
null
}
}

override suspend fun deleteSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal class UserBackendService(
properties: PropertiesObject,
refreshDeviceMetadata: Boolean,
propertyiesDelta: PropertiesDeltasObject,
) {
): String? {
val jsonObject =
JSONObject()
.put("refresh_device_metadata", refreshDeviceMetadata)
Expand All @@ -70,6 +70,13 @@ internal class UserBackendService(
if (!response.isSuccess) {
throw BackendException(response.statusCode, response.payload, response.retryAfterSeconds)
}

val responseBody = JSONObject(response.payload)
return if (responseBody.has("ryw_token")) {
responseBody.getString("ryw_token")
} else {
null
}
}

override suspend fun getUser(
Expand Down
Loading

0 comments on commit 84d0d60

Please sign in to comment.