Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add error handling and retry #13

Merged
merged 18 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions android/src/main/java/com/amplitude/android/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package com.amplitude.android
import android.content.Context
import com.amplitude.android.utilities.AndroidLoggerProvider
import com.amplitude.core.Configuration
import com.amplitude.core.EventCallBack
import com.amplitude.core.LoggerProvider
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.utilities.FileStorageProvider

class Configuration(
Expand All @@ -18,7 +18,7 @@ class Configuration(
storageProvider: StorageProvider = FileStorageProvider(),
loggerProvider: LoggerProvider = AndroidLoggerProvider(),
minIdLength: Int? = null,
callback: ((BaseEvent) -> Unit)? = null,
callback: EventCallBack? = null,
useAdvertisingIdForDeviceId: Boolean = false,
useAppSetIdForDeviceId: Boolean = false,
enableCoppaControl: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.amplitude.android.utilities

import com.amplitude.core.Amplitude
import com.amplitude.core.Configuration
import com.amplitude.core.Storage
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.EventPipeline
import com.amplitude.core.utilities.ResponseHandler
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope

class AndroidStorage(
val amplitude: Amplitude
Expand All @@ -24,7 +29,23 @@ class AndroidStorage(
TODO("Not yet implemented")
}

override fun getEvents(): List<String> {
override fun readEventsContent(): List<Any> {
TODO("Not yet implemented")
}

override fun getEventsString(content: Any): String {
TODO("Not yet implemented")
}

override fun getResponseHandler(
storage: Storage,
eventPipeline: EventPipeline,
configuration: Configuration,
scope: CoroutineScope,
dispatcher: CoroutineDispatcher,
events: Any,
eventsString: String
): ResponseHandler {
TODO("Not yet implemented")
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/amplitude/core/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ open class Amplitude internal constructor(
val amplitudeScope: CoroutineScope = CoroutineScope(SupervisorJob()),
val amplitudeDispatcher: CoroutineDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher(),
val networkIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(),
val storageIODispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
val storageIODispatcher: CoroutineDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher(),
val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
) {

internal val timeline: Timeline
val storage: Storage
val logger: Logger
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/com/amplitude/core/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.amplitude.core.events.BaseEvent
import com.amplitude.core.utilities.ConsoleLoggerProvider
import com.amplitude.core.utilities.InMemoryStorageProvider

typealias EventCallBack = (BaseEvent, status: Int, message: String) -> Unit

open class Configuration(
val apiKey: String,
val flushQueueSize: Int = FLUSH_QUEUE_SIZE,
Expand All @@ -13,12 +15,14 @@ open class Configuration(
val storageProvider: StorageProvider = InMemoryStorageProvider(),
val loggerProvider: LoggerProvider = ConsoleLoggerProvider(),
val minIdLength: Int? = null,
val callback: ((BaseEvent) -> Unit)? = null
val callback: EventCallBack? = null,
val flushMaxRetries: Int = FLUSH_MAX_RETRIES
) {

companion object {
const val FLUSH_QUEUE_SIZE = 30
const val FLUSH_INTERVAL_MILLIS = 30 * 1000 // 30s
const val FLUSH_MAX_RETRIES = 5
const val DEFAULT_INSTANCE = "\$default_instance"
}

Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/com/amplitude/core/Storage.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.amplitude.core

import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.EventPipeline
import com.amplitude.core.utilities.ResponseHandler
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope

interface Storage {

Expand All @@ -20,7 +24,11 @@ interface Storage {

fun read(key: Constants): String?

fun getEvents(): List<String>
fun readEventsContent(): List<Any>

fun getEventsString(content: Any): String

fun getResponseHandler(storage: Storage, eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher, events: Any, eventsString: String): ResponseHandler
}

interface StorageProvider {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/com/amplitude/core/events/EventOptions.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.amplitude.core.events

import com.amplitude.core.EventCallBack

open class EventOptions {
var userId: String? = null
var deviceId: String? = null
Expand Down Expand Up @@ -37,5 +39,6 @@ open class EventOptions {
var productId: String? = null
var revenueType: String? = null
var extra: Map<String, Any>? = null
var callback: ((BaseEvent) -> Unit)? = null
var callback: EventCallBack? = null
internal var attempts: Int = 0
}
26 changes: 18 additions & 8 deletions core/src/main/java/com/amplitude/core/platform/EventPipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import kotlinx.coroutines.withContext
import java.lang.Exception
import java.util.concurrent.atomic.AtomicInteger

internal class EventPipeline(
class EventPipeline(
private val amplitude: Amplitude
) {

Expand All @@ -35,6 +35,8 @@ internal class EventPipeline(
var scheduled: Boolean
private set

var flushSizeDivider: AtomicInteger = AtomicInteger(1)

companion object {
internal const val UPLOAD_SIG = "#!upload"
}
Expand All @@ -50,6 +52,7 @@ internal class EventPipeline(
}

fun put(event: BaseEvent) {
event.attempts += 1
writeChannel.trySend(WriteQueueMessage(WriteQueueMessageType.EVENT, event))
}

Expand All @@ -76,7 +79,9 @@ internal class EventPipeline(
if (!triggerFlush && message.event != null) try {
storage.writeEvent(message.event)
} catch (e: Exception) {
e.printStackTrace()
e.message?.let {
amplitude.logger.error(it)
}
}

// if flush condition met, generate paths
Expand All @@ -96,27 +101,32 @@ internal class EventPipeline(
storage.rollover()
}

val eventsData = storage.getEvents()
val eventsData = storage.readEventsContent()
for (events in eventsData) {
if (events.isEmpty()) continue
val eventsString = storage.getEventsString(events)
if (eventsString.isEmpty()) continue

try {
val connection = httpClient.upload()
connection.outputStream?.let {
connection.setEvents(events)
connection.setEvents(eventsString)
// Upload the payloads.
connection.close()
}
val responseHandler = storage.getResponseHandler(storage, this@EventPipeline, amplitude.configuration, scope, amplitude.retryDispatcher, events, eventsString)
responseHandler?.handle(connection.response)
} catch (e: Exception) {
e.printStackTrace()
e.message?.let {
amplitude.logger.error(it)
}
}
// @TODO: handle failures and retry
}
}
}

private fun getFlushCount(): Int {
return amplitude.configuration.flushQueueSize
val count = amplitude.configuration.flushQueueSize / flushSizeDivider.get()
return count.takeUnless { it == 0 } ?: 1
}

private fun getFlushIntervalInMillis(): Long {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.amplitude.core.Amplitude
import com.amplitude.core.Constants
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Plugin
import java.util.UUID

class ContextPlugin : Plugin {
override val type: Plugin.Type = Plugin.Type.Before
Expand All @@ -14,6 +15,8 @@ class ContextPlugin : Plugin {
}

private fun applyContextData(event: BaseEvent) {
event.timestamp = System.currentTimeMillis()
event.insertId = UUID.randomUUID().toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to check if timestamp and insertId already set by user to avoid overriding their value

event.library = Constants.SDK_LIBRARY + "/" + Constants.SDK_VERSION
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.amplitude.id.utilities.KeyValueStore
import com.amplitude.id.utilities.createDirectory
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.json.JSONArray
import java.io.File
import java.io.FileOutputStream

Expand Down Expand Up @@ -101,6 +102,24 @@ class EventsFileManager(
finish()
}

/**
* Split one file to two smaller file
* This is used to handle payload too large error response
*/
fun splitFile(filePath: String, events: JSONArray) {
val originalFile = File(filePath)
if (!originalFile.exists()) {
return
}
val fileName = originalFile.name
val firstHalfFile = File(directory, "$fileName-1.tmp")
val secondHalfFile = File(directory, "$fileName-2.tmp")
val splitStrings = events.split()
writeToFile(splitStrings.first, firstHalfFile)
writeToFile(splitStrings.second, secondHalfFile)
this.remove(filePath)
}

private fun finish() {
val file = currentFile()
if (!file.exists()) {
Expand Down Expand Up @@ -135,6 +154,17 @@ class EventsFileManager(
}
}

private fun writeToFile(content: String, file: File) {
file.createNewFile()
val fileOS = FileOutputStream(file, true)
fileOS.run {
write(content.toByteArray())
flush()
}
file.renameTo(File(directory, file.nameWithoutExtension))
fileOS.close()
}

private fun reset() {
os = null
curFile = null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.amplitude.core.utilities

import com.amplitude.core.Configuration
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.EventPipeline
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.json.JSONArray

internal class FileResponseHandler(
private val storage: FileStorage,
private val eventPipeline: EventPipeline,
private val configuration: Configuration,
private val scope: CoroutineScope,
private val dispatcher: CoroutineDispatcher,
private val eventFilePath: String,
private val eventsString: String
) : ResponseHandler {

override fun handleSuccessResponse(successResponse: SuccessResponse) {
val events = JSONArray(eventsString).toEvents()
triggerEventsCallback(events, HttpStatus.SUCCESS.code, "Event sent success.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice for us to standarize the messaging here. We can add that to the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can sync up on this

scope.launch(dispatcher) {
storage.removeFile(eventFilePath)
}
}

override fun handleBadRequestResponse(badRequestResponse: BadRequestResponse) {
val events = JSONArray(eventsString).toEvents()
if (events.size == 1) {
triggerEventsCallback(events, HttpStatus.BAD_REQUEST.code, badRequestResponse.error)
storage.removeFile(eventFilePath)
return
}
val droppedIndices = badRequestResponse.getEventIndicesToDrop()
val eventsToDrop = mutableListOf<BaseEvent>()
val eventsToRetry = mutableListOf<BaseEvent>()
events.forEachIndexed { index, event ->
if (droppedIndices.contains(index) || badRequestResponse.isEventSilenced(event)) {
eventsToDrop.add(event)
} else {
eventsToRetry.add(event)
}
}
triggerEventsCallback(eventsToDrop, HttpStatus.BAD_REQUEST.code, badRequestResponse.error)
eventsToRetry.forEach {
eventPipeline.put(it)
}
scope.launch(dispatcher) {
storage.removeFile(eventFilePath)
}
}

override fun handlePayloadTooLargeResponse(payloadTooLargeResponse: PayloadTooLargeResponse) {
val rawEvents = JSONArray(eventsString)
if (rawEvents.length() == 1) {
val events = rawEvents.toEvents()
triggerEventsCallback(events, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error)
scope.launch(dispatcher) {
storage.removeFile(eventFilePath)
}
return
}
// split file into two
scope.launch(dispatcher) {
storage.splitEventFile(eventFilePath, rawEvents)
}
}

override fun handleTooManyRequestsResponse(tooManyRequestsResponse: TooManyRequestsResponse) {
// wait for next time to pick it up
}

override fun handleTimeoutResponse(timeoutResponse: TimeoutResponse) {
// wait for next time to try again
}

override fun handleFailedResponse(failedResponse: FailedResponse) {
// wait for next time to try again
}

private fun triggerEventsCallback(events: List<BaseEvent>, status: Int, message: String) {
events.forEach { event ->
configuration.callback?.let {
it(event, status, message)
}
event.insertId?.let { insertId ->
storage.getEventCallback(insertId)?.let {
it(event, status, message)
storage.removeEventCallback(insertId)
}
}
}
}
}
Loading