diff --git a/android/src/main/java/com/amplitude/android/Amplitude.kt b/android/src/main/java/com/amplitude/android/Amplitude.kt index 2153b46e..e3330136 100644 --- a/android/src/main/java/com/amplitude/android/Amplitude.kt +++ b/android/src/main/java/com/amplitude/android/Amplitude.kt @@ -68,6 +68,7 @@ open class Amplitude( override fun reset(): Amplitude { this.setUserId(null) amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() idContainer.identityManager.editIdentity().setDeviceId(null).commit() androidContextPlugin.initializeDeviceId(configuration as Configuration) } @@ -76,13 +77,15 @@ open class Amplitude( fun onEnterForeground(timestamp: Long) { amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() startNewSessionIfNeeded(timestamp) inForeground = true } } - fun onExitForeground(timestamp: Long) { + fun onExitForeground() { amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() inForeground = false if ((configuration as Configuration).flushEventsOnClose) { flush() @@ -125,6 +128,7 @@ open class Amplitude( sessionId = timestamp previousSessionId = timestamp amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() storage.write(Storage.Constants.PREVIOUS_SESSION_ID, timestamp.toString()) } } @@ -138,7 +142,7 @@ open class Amplitude( // start new session setSessionId(timestamp) refreshSessionTime(timestamp) - if ((configuration as Configuration).trackingSessionEvents) { + if (configuration.trackingSessionEvents) { sendSessionEvent(START_SESSION_EVENT) } } @@ -156,6 +160,7 @@ open class Amplitude( } lastEventTime = timestamp amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() storage.write(Storage.Constants.LAST_EVENT_TIME, timestamp.toString()) } } diff --git a/android/src/main/java/com/amplitude/android/plugins/AndroidLifecyclePlugin.kt b/android/src/main/java/com/amplitude/android/plugins/AndroidLifecyclePlugin.kt index aa3861b3..1f985dd3 100644 --- a/android/src/main/java/com/amplitude/android/plugins/AndroidLifecyclePlugin.kt +++ b/android/src/main/java/com/amplitude/android/plugins/AndroidLifecyclePlugin.kt @@ -27,7 +27,7 @@ class AndroidLifecyclePlugin : Application.ActivityLifecycleCallbacks, Plugin { } override fun onActivityPaused(activity: Activity) { - (amplitude as com.amplitude.android.Amplitude).onExitForeground(getCurrentTimeMillis()) + (amplitude as com.amplitude.android.Amplitude).onExitForeground() } override fun onActivityStopped(activity: Activity) { diff --git a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt index 81fb1a29..19d9c3f5 100644 --- a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt +++ b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt @@ -17,7 +17,6 @@ import com.amplitude.core.utilities.ResponseHandler import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import org.json.JSONArray -import java.io.BufferedReader import java.io.File class AndroidStorage( @@ -61,11 +60,8 @@ class AndroidStorage( return eventsFile.read() } - override fun getEventsString(content: Any): String { - val bufferedReader: BufferedReader = File(content as String).bufferedReader() - bufferedReader.use { - return it.readText() - } + override suspend fun getEventsString(content: Any): String { + return eventsFile.getEventString(content as String) } override fun getResponseHandler( @@ -92,10 +88,7 @@ class AndroidStorage( } override fun getEventCallback(insertId: String): EventCallBack? { - if (eventCallbacksMap.contains(insertId)) { - return eventCallbacksMap.get(insertId) - } - return null + return eventCallbacksMap[insertId] } override fun removeEventCallback(insertId: String) { diff --git a/core/src/main/java/com/amplitude/core/Amplitude.kt b/core/src/main/java/com/amplitude/core/Amplitude.kt index ad2ea72d..6d54451b 100644 --- a/core/src/main/java/com/amplitude/core/Amplitude.kt +++ b/core/src/main/java/com/amplitude/core/Amplitude.kt @@ -185,6 +185,7 @@ open class Amplitude internal constructor( */ fun setDeviceId(deviceId: String): Amplitude { amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() idContainer.identityManager.editIdentity().setDeviceId(deviceId).commit() } return this @@ -317,6 +318,7 @@ open class Amplitude internal constructor( logger.info("Skip event for opt out config.") } amplitudeScope.launch(amplitudeDispatcher) { + isBuilt.await() timeline.process(event) } } diff --git a/core/src/main/java/com/amplitude/core/Storage.kt b/core/src/main/java/com/amplitude/core/Storage.kt index f990297d..74a70709 100644 --- a/core/src/main/java/com/amplitude/core/Storage.kt +++ b/core/src/main/java/com/amplitude/core/Storage.kt @@ -26,7 +26,7 @@ interface Storage { fun readEventsContent(): List - fun getEventsString(content: Any): String + suspend fun getEventsString(content: Any): String fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher, events: Any, eventsString: String): ResponseHandler } diff --git a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt index af41fde6..e8f34e21 100644 --- a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt @@ -7,6 +7,8 @@ import kotlinx.coroutines.sync.withLock import org.json.JSONArray import java.io.File import java.io.FileOutputStream +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap class EventsFileManager( private val directory: File, @@ -15,19 +17,16 @@ class EventsFileManager( ) { init { createDirectory(directory) - registerShutdownHook() } private val fileIndexKey = "amplitude.events.file.index.$apiKey" - private var os: FileOutputStream? = null - - private var curFile: File? = null - - private val mutex = Mutex() - companion object { const val MAX_FILE_SIZE = 975_000 // 975KB + val writeMutex = Mutex() + val readMutex = Mutex() + val filePathSet: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) + val curFile: MutableMap = ConcurrentHashMap() } /** @@ -35,7 +34,7 @@ class EventsFileManager( * opens a new file, if current file is full or uncreated * stores the event */ - suspend fun storeEvent(event: String) = mutex.withLock { + suspend fun storeEvent(event: String) = writeMutex.withLock { var file = currentFile() if (!file.exists()) { // create it @@ -43,11 +42,14 @@ class EventsFileManager( } // check if file is at capacity - if (file.length() > MAX_FILE_SIZE) { - finish() + while (file.length() > MAX_FILE_SIZE) { + finish(file) // update index file = currentFile() - file.createNewFile() + if (!file.exists()) { + // create it + file.createNewFile() + } } var contents = "" @@ -69,7 +71,7 @@ class EventsFileManager( * Returns a comma-separated list of file paths that are not yet uploaded */ fun read(): List { - // we need to filter out .temp file, since its operating on the writing thread + // we need to filter out .temp file, since it's operating on the writing thread val fileList = directory.listFiles { _, name -> name.contains(apiKey) && !name.endsWith(".tmp") } ?: emptyArray() @@ -82,6 +84,7 @@ class EventsFileManager( * deletes the file at filePath */ fun remove(filePath: String): Boolean { + filePathSet.remove(filePath) return File(filePath).delete() } @@ -95,8 +98,11 @@ class EventsFileManager( * closes current file, and increase the index * so next write go to a new file */ - suspend fun rollover() = mutex.withLock { - finish() + suspend fun rollover() = writeMutex.withLock { + val file = currentFile() + if (file.exists() && file.length() > 0) { + finish(file) + } } /** @@ -117,62 +123,64 @@ class EventsFileManager( this.remove(filePath) } - private fun finish() { - val file = currentFile() - if (!file.exists()) { - // if tmp file doesnt exist then we dont need to do anything + suspend fun getEventString(filePath: String): String = readMutex.withLock { + // Block one time of file reads if another task has read the content of this file + if (filePathSet.contains(filePath)) { + filePathSet.remove(filePath) + return "" + } + filePathSet.add(filePath) + File(filePath).bufferedReader().use { + return it.readText() + } + } + + private fun finish(file: File?) { + if (file == null || !file.exists() || file.length() == 0L) { + // if tmp file doesn't exist or empty then we don't need to do anything return } // close events array and batch object val contents = """]""" writeToFile(contents.toByteArray(), file) file.renameTo(File(directory, file.nameWithoutExtension)) - os?.close() incrementFileIndex() reset() } // return the current tmp file private fun currentFile(): File { - curFile = curFile ?: run { - val index = kvs.getLong(fileIndexKey, 0) - File(directory, "$apiKey-$index.tmp") - } + val file = curFile[apiKey] ?: run { + // check leftover tmp file + val fileList = directory.listFiles { _, name -> + name.contains(apiKey) && name.endsWith(".tmp") + } ?: emptyArray() - return curFile!! + fileList.getOrNull(0) + } + val index = kvs.getLong(fileIndexKey, 0) + curFile[apiKey] = file ?: File(directory, "$apiKey-$index.tmp") + return curFile[apiKey]!! } // write to underlying file private fun writeToFile(content: ByteArray, file: File) { - os = os ?: FileOutputStream(file, true) - os?.run { - write(content) - flush() + FileOutputStream(file, true).use { + it.write(content) + it.flush() } } private fun writeToFile(content: String, file: File) { file.createNewFile() - val fileOS = FileOutputStream(file, true) - fileOS.run { - write(content.toByteArray()) - flush() + FileOutputStream(file).use { + it.write(content.toByteArray()) + it.flush() } file.renameTo(File(directory, file.nameWithoutExtension)) - fileOS.close() } private fun reset() { - os = null - curFile = null - } - - private fun registerShutdownHook() { - // close the stream if the app shuts down - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - os?.close() - } - }) + curFile.remove(apiKey) } } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt index ec61cb68..5dadb60f 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileResponseHandler.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.json.JSONArray +import org.json.JSONException class FileResponseHandler( private val storage: EventsFileStorage, @@ -19,7 +20,14 @@ class FileResponseHandler( ) : ResponseHandler { override fun handleSuccessResponse(successResponse: SuccessResponse) { - val events = JSONArray(eventsString).toEvents() + val events: List + try { + events = JSONArray(eventsString).toEvents() + } catch (e: JSONException) { + storage.removeFile(eventFilePath) + removeCallbackByInsertId(eventsString) + throw e + } triggerEventsCallback(events, HttpStatus.SUCCESS.code, "Event sent success.") scope.launch(dispatcher) { storage.removeFile(eventFilePath) @@ -27,7 +35,14 @@ class FileResponseHandler( } override fun handleBadRequestResponse(badRequestResponse: BadRequestResponse) { - val events = JSONArray(eventsString).toEvents() + val events: List + try { + events = JSONArray(eventsString).toEvents() + } catch (e: JSONException) { + storage.removeFile(eventFilePath) + removeCallbackByInsertId(eventsString) + throw e + } if (events.size == 1) { triggerEventsCallback(events, HttpStatus.BAD_REQUEST.code, badRequestResponse.error) storage.removeFile(eventFilePath) @@ -53,7 +68,14 @@ class FileResponseHandler( } override fun handlePayloadTooLargeResponse(payloadTooLargeResponse: PayloadTooLargeResponse) { - val rawEvents = JSONArray(eventsString) + val rawEvents: JSONArray + try { + rawEvents = JSONArray(eventsString) + } catch (e: JSONException) { + storage.removeFile(eventFilePath) + removeCallbackByInsertId(eventsString) + throw e + } if (rawEvents.length() == 1) { val events = rawEvents.toEvents() triggerEventsCallback(events, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error) @@ -93,4 +115,11 @@ class FileResponseHandler( } } } + + private fun removeCallbackByInsertId(eventsString: String) { + val regx = """"insert_id":"(.{36})",""".toRegex() + regx.findAll(eventsString).forEach { + storage.removeEventCallback(it.groupValues[1]) + } + } } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt index 3ad8b642..2b22782d 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt @@ -59,7 +59,7 @@ class FileStorage( return eventsFile.read() } - override fun getEventsString(content: Any): String { + override suspend fun getEventsString(content: Any): String { // content is filePath String val bufferedReader: BufferedReader = File(content as String).bufferedReader() bufferedReader.use { diff --git a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt index 0adbb30f..999c671b 100644 --- a/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/InMemoryStorage.kt @@ -48,7 +48,7 @@ class InMemoryStorage( return listOf(eventsToSend) } - override fun getEventsString(content: Any): String { + override suspend fun getEventsString(content: Any): String { // content is list of BaseEvent return JSONUtil.eventsToString(content as List) }