Skip to content

Commit

Permalink
Merge pull request #2222 from DataDog/xgouchet/RUM-5977/custom_upload…
Browse files Browse the repository at this point in the history
…_scheduler_strategy

RUM-5977 create UploadSchedulerStrategy interface and default implementation
  • Loading branch information
xgouchet authored Aug 30, 2024
2 parents 4b09b22 + ad6e8f1 commit 045a231
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 539 deletions.
2 changes: 2 additions & 0 deletions dd-sdk-android-core/api/apiSurface
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enum com.datadog.android.core.configuration.UploadFrequency
- FREQUENT
- AVERAGE
- RARE
interface com.datadog.android.core.configuration.UploadSchedulerStrategy
fun getMsDelayUntilNextUpload(String, Int, Int?, Throwable?): Long
interface com.datadog.android.core.constraints.DataConstraints
fun <T: Any?> validateAttributes(Map<String, T>, String? = null, String? = null, Set<String> = emptySet()): MutableMap<String, T>
fun validateTags(List<String>): List<String>
Expand Down
4 changes: 4 additions & 0 deletions dd-sdk-android-core/api/dd-sdk-android-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,10 @@ public final class com/datadog/android/core/configuration/UploadFrequency : java
public static fun values ()[Lcom/datadog/android/core/configuration/UploadFrequency;
}

public abstract interface class com/datadog/android/core/configuration/UploadSchedulerStrategy {
public abstract fun getMsDelayUntilNextUpload (Ljava/lang/String;ILjava/lang/Integer;Ljava/lang/Throwable;)J
}

public abstract interface class com/datadog/android/core/constraints/DataConstraints {
public abstract fun validateAttributes (Ljava/util/Map;Ljava/lang/String;Ljava/lang/String;Ljava/util/Set;)Ljava/util/Map;
public abstract fun validateTags (Ljava/util/List;)Ljava/util/List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2016-Present Datadog, Inc.
*/

package com.datadog.android.core.configuration

import com.datadog.android.api.feature.Feature

/**
* Defines the strategy used to schedule the waiting period between batch uploads.
*/
interface UploadSchedulerStrategy {

/**
* Should return the delay in milliseconds to wait until the next upload attempt
* is performed.
* @param featureName the name of the feature for which a new upload will be scheduled. Known feature names are
* listed in the [Feature.Companion] object.
* @param uploadAttempts the number of requests that were attempted during the last upload batch. Will be zero if
* the device is not ready (e.g.: when offline or with low battery) or no data is ready to be sent.
* If multiple batches can be uploaded, the attempts will stop at the first failure.
* @param lastStatusCode the HTTP status code of the last request (if available). A successful upload will have a
* status code 202 (Accepted). When null, it means that the network request didn't fully complete.
* @param throwable the exception thrown during the upload process (if any).
*/
fun getMsDelayUntilNextUpload(
featureName: String,
uploadAttempts: Int,
lastStatusCode: Int?,
throwable: Throwable?
): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.datadog.android.api.net.RequestFactory
import com.datadog.android.api.storage.EventBatchWriter
import com.datadog.android.api.storage.FeatureStorageConfiguration
import com.datadog.android.api.storage.datastore.DataStoreHandler
import com.datadog.android.core.configuration.UploadSchedulerStrategy
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.data.upload.DataFlusher
import com.datadog.android.core.internal.data.upload.DataOkHttpUploader
import com.datadog.android.core.internal.data.upload.DataUploadScheduler
import com.datadog.android.core.internal.data.upload.DataUploader
import com.datadog.android.core.internal.data.upload.DefaultUploadSchedulerStrategy
import com.datadog.android.core.internal.data.upload.NoOpDataUploader
import com.datadog.android.core.internal.data.upload.NoOpUploadScheduler
import com.datadog.android.core.internal.data.upload.UploadScheduler
Expand Down Expand Up @@ -88,27 +90,28 @@ internal class SdkFeature(
return
}

var dataUploadConfiguration: DataUploadConfiguration? = null
if (wrappedFeature is StorageBackedFeature) {
val uploadFrequency = coreFeature.uploadFrequency
val batchProcessingLevel = coreFeature.batchProcessingLevel
dataUploadConfiguration = DataUploadConfiguration(

val dataUploadConfiguration = DataUploadConfiguration(
uploadFrequency,
batchProcessingLevel.maxBatchesPerUploadJob
)
val uploadSchedulerStrategy = DefaultUploadSchedulerStrategy(dataUploadConfiguration)
storage = prepareStorage(
dataUploadConfiguration,
wrappedFeature,
context,
instanceId,
coreFeature.persistenceStrategyFactory
)
}

wrappedFeature.onInitialize(context)
wrappedFeature.onInitialize(context)

if (wrappedFeature is StorageBackedFeature && dataUploadConfiguration != null) {
setupUploader(wrappedFeature, dataUploadConfiguration)
setupUploader(wrappedFeature, uploadSchedulerStrategy, dataUploadConfiguration.maxBatchesPerUploadJob)
} else {
wrappedFeature.onInitialize(context)
}

if (wrappedFeature is TrackingConsentProviderCallback) {
Expand Down Expand Up @@ -229,7 +232,7 @@ internal class SdkFeature(
// region Internal

private fun setupMetricsDispatcher(
dataUploadConfiguration: DataUploadConfiguration,
dataUploadConfiguration: DataUploadConfiguration?,
filePersistenceConfig: FilePersistenceConfig,
context: Context
) {
Expand All @@ -251,7 +254,8 @@ internal class SdkFeature(

private fun setupUploader(
feature: StorageBackedFeature,
uploadConfiguration: DataUploadConfiguration
uploadSchedulerStrategy: UploadSchedulerStrategy,
maxBatchesPerJob: Int
) {
uploadScheduler = if (coreFeature.isMainProcess) {
uploader = createUploader(feature.requestFactory)
Expand All @@ -262,7 +266,8 @@ internal class SdkFeature(
coreFeature.contextProvider,
coreFeature.networkInfoProvider,
coreFeature.systemInfoProvider,
uploadConfiguration,
uploadSchedulerStrategy,
maxBatchesPerJob,
coreFeature.uploadExecutorService,
internalLogger
)
Expand All @@ -274,7 +279,7 @@ internal class SdkFeature(
// region Feature setup

private fun prepareStorage(
dataUploadConfiguration: DataUploadConfiguration,
dataUploadConfiguration: DataUploadConfiguration?,
wrappedFeature: StorageBackedFeature,
context: Context,
instanceId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import com.datadog.android.api.InternalLogger
import com.datadog.android.api.context.DatadogContext
import com.datadog.android.api.context.NetworkInfo
import com.datadog.android.api.storage.RawBatchEvent
import com.datadog.android.core.configuration.UploadSchedulerStrategy
import com.datadog.android.core.internal.ContextProvider
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.metrics.RemovalReason
import com.datadog.android.core.internal.net.info.NetworkInfoProvider
import com.datadog.android.core.internal.persistence.BatchId
Expand All @@ -21,9 +21,6 @@ import com.datadog.android.core.internal.system.SystemInfoProvider
import com.datadog.android.core.internal.utils.scheduleSafe
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.math.max
import kotlin.math.min
import kotlin.math.roundToLong

internal class DataUploadRunnable(
private val featureName: String,
Expand All @@ -33,53 +30,44 @@ internal class DataUploadRunnable(
private val contextProvider: ContextProvider,
private val networkInfoProvider: NetworkInfoProvider,
private val systemInfoProvider: SystemInfoProvider,
uploadConfiguration: DataUploadConfiguration,
internal val uploadSchedulerStrategy: UploadSchedulerStrategy,
internal val maxBatchesPerJob: Int,
private val internalLogger: InternalLogger
) : UploadRunnable {

internal var currentDelayIntervalMs = uploadConfiguration.defaultDelayMs
internal val minDelayMs = uploadConfiguration.minDelayMs
internal val maxDelayMs = uploadConfiguration.maxDelayMs
internal val maxBatchesPerJob = uploadConfiguration.maxBatchesPerUploadJob

// region Runnable

@WorkerThread
override fun run() {
var uploadAttempts = 0
var lastBatchUploadStatus: UploadStatus? = null
if (isNetworkAvailable() && isSystemReady()) {
val context = contextProvider.context
var batchConsumerAvailableAttempts = maxBatchesPerJob
var lastBatchUploadStatus: UploadStatus?
do {
batchConsumerAvailableAttempts--
lastBatchUploadStatus = handleNextBatch(context)
} while (batchConsumerAvailableAttempts > 0 &&
lastBatchUploadStatus is UploadStatus.Success
if (lastBatchUploadStatus != null) {
uploadAttempts++
}
} while (
batchConsumerAvailableAttempts > 0 && lastBatchUploadStatus is UploadStatus.Success
)
if (lastBatchUploadStatus != null) {
handleBatchConsumingJobFrequency(lastBatchUploadStatus)
} else {
// there was no batch left or there was a problem reading the next batch
// in the storage so we increase the interval
increaseInterval(lastBatchUploadStatus)
}
}

scheduleNextUpload()
val delayMs = uploadSchedulerStrategy.getMsDelayUntilNextUpload(
featureName,
uploadAttempts,
lastBatchUploadStatus?.code,
lastBatchUploadStatus?.throwable
)
scheduleNextUpload(delayMs)
}

// endregion

// region Internal

private fun handleBatchConsumingJobFrequency(lastBatchUploadStatus: UploadStatus) {
if (lastBatchUploadStatus.shouldRetry) {
increaseInterval(lastBatchUploadStatus)
} else {
decreaseInterval()
}
}

@WorkerThread
@Suppress("UnsafeThirdPartyFunctionCall") // called inside a dedicated executor
private fun handleNextBatch(context: DatadogContext): UploadStatus? {
Expand Down Expand Up @@ -109,11 +97,11 @@ internal class DataUploadRunnable(
return hasEnoughPower && !systemInfo.powerSaveMode
}

private fun scheduleNextUpload() {
private fun scheduleNextUpload(delayMs: Long) {
threadPoolExecutor.remove(this)
threadPoolExecutor.scheduleSafe(
"$featureName: data upload",
currentDelayIntervalMs,
delayMs,
TimeUnit.MILLISECONDS,
internalLogger,
this
Expand All @@ -137,37 +125,9 @@ internal class DataUploadRunnable(
return status
}

@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
private fun decreaseInterval() {
currentDelayIntervalMs = max(
minDelayMs,
@Suppress("UnsafeThirdPartyFunctionCall") // not a NaN
(currentDelayIntervalMs * DECREASE_PERCENT).roundToLong()
)
}

@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
private fun increaseInterval(status: UploadStatus?) {
currentDelayIntervalMs = if (status is UploadStatus.DNSError) {
// A DNS error will likely not be a fluke, so we use a longer delay to avoid infinite looping
// and prevent battery draining
maxDelayMs * DNS_DELAY_MULTIPLIER
} else {
min(
maxDelayMs,
@Suppress("UnsafeThirdPartyFunctionCall") // not a NaN
(currentDelayIntervalMs * INCREASE_PERCENT).roundToLong()
)
}
}

// endregion

companion object {
internal const val LOW_BATTERY_THRESHOLD = 10
const val DECREASE_PERCENT = 0.90
const val INCREASE_PERCENT = 1.10

internal const val DNS_DELAY_MULTIPLIER = 100
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
package com.datadog.android.core.internal.data.upload

import com.datadog.android.api.InternalLogger
import com.datadog.android.core.configuration.UploadSchedulerStrategy
import com.datadog.android.core.internal.ContextProvider
import com.datadog.android.core.internal.configuration.DataUploadConfiguration
import com.datadog.android.core.internal.net.info.NetworkInfoProvider
import com.datadog.android.core.internal.persistence.Storage
import com.datadog.android.core.internal.system.SystemInfoProvider
Expand All @@ -22,20 +22,22 @@ internal class DataUploadScheduler(
contextProvider: ContextProvider,
networkInfoProvider: NetworkInfoProvider,
systemInfoProvider: SystemInfoProvider,
uploadConfiguration: DataUploadConfiguration,
uploadSchedulerStrategy: UploadSchedulerStrategy,
maxBatchesPerJob: Int,
private val scheduledThreadPoolExecutor: ScheduledThreadPoolExecutor,
private val internalLogger: InternalLogger
) : UploadScheduler {

internal val runnable = DataUploadRunnable(
featureName,
scheduledThreadPoolExecutor,
storage,
dataUploader,
contextProvider,
networkInfoProvider,
systemInfoProvider,
uploadConfiguration,
featureName = featureName,
threadPoolExecutor = scheduledThreadPoolExecutor,
storage = storage,
dataUploader = dataUploader,
contextProvider = contextProvider,
networkInfoProvider = networkInfoProvider,
systemInfoProvider = systemInfoProvider,
uploadSchedulerStrategy = uploadSchedulerStrategy,
maxBatchesPerJob = maxBatchesPerJob,
internalLogger = internalLogger
)

Expand Down
Loading

0 comments on commit 045a231

Please sign in to comment.