Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Unterminated arrays caused by multi client instances with same name and api key #61

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ open class Amplitude(
override fun reset(): Amplitude {
this.setUserId(null)
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
idContainer.identityManager.editIdentity().setDeviceId(null).commit()
androidContextPlugin.initializeDeviceId(configuration as Configuration)
}
Expand All @@ -76,13 +77,15 @@ open class Amplitude(

fun onEnterForeground(timestamp: Long) {
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
startNewSessionIfNeeded(timestamp)
inForeground = true
}
}

fun onExitForeground(timestamp: Long) {
fun onExitForeground() {
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
inForeground = false
if ((configuration as Configuration).flushEventsOnClose) {
flush()
Expand Down Expand Up @@ -125,6 +128,7 @@ open class Amplitude(
sessionId = timestamp
previousSessionId = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
storage.write(Storage.Constants.PREVIOUS_SESSION_ID, timestamp.toString())
}
}
Expand Down Expand Up @@ -156,6 +160,7 @@ open class Amplitude(
}
lastEventTime = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
storage.write(Storage.Constants.LAST_EVENT_TIME, timestamp.toString())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class AndroidLifecyclePlugin : Application.ActivityLifecycleCallbacks, Plugin {
}

override fun onActivityPaused(activity: Activity) {
(amplitude as com.amplitude.android.Amplitude).onExitForeground(getCurrentTimeMillis())
(amplitude as com.amplitude.android.Amplitude).onExitForeground()
}

override fun onActivityStopped(activity: Activity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import com.amplitude.core.utilities.ResponseHandler
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import org.json.JSONArray
import java.io.BufferedReader
import java.io.File

class AndroidStorage(
Expand Down Expand Up @@ -61,11 +60,8 @@ class AndroidStorage(
return eventsFile.read()
}

override fun getEventsString(content: Any): String {
val bufferedReader: BufferedReader = File(content as String).bufferedReader()
bufferedReader.use {
return it.readText()
}
override suspend fun getEventsString(content: Any): String {
return eventsFile.getEventString(content as String)
}

override fun getResponseHandler(
Expand All @@ -92,10 +88,7 @@ class AndroidStorage(
}

override fun getEventCallback(insertId: String): EventCallBack? {
if (eventCallbacksMap.contains(insertId)) {
return eventCallbacksMap.get(insertId)
}
return null
return eventCallbacksMap[insertId]
}

override fun removeEventCallback(insertId: String) {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/com/amplitude/core/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ open class Amplitude internal constructor(
*/
fun setDeviceId(deviceId: String): Amplitude {
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
idContainer.identityManager.editIdentity().setDeviceId(deviceId).commit()
}
return this
Expand Down Expand Up @@ -317,6 +318,7 @@ open class Amplitude internal constructor(
logger.info("Skip event for opt out config.")
}
amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
timeline.process(event)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/com/amplitude/core/Storage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface Storage {

fun readEventsContent(): List<Any>

fun getEventsString(content: Any): String
suspend fun getEventsString(content: Any): String

fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher, events: Any, eventsString: String): ResponseHandler
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import kotlinx.coroutines.sync.withLock
import org.json.JSONArray
import java.io.File
import java.io.FileOutputStream
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

class EventsFileManager(
private val directory: File,
Expand All @@ -15,39 +17,39 @@ class EventsFileManager(
) {
init {
createDirectory(directory)
registerShutdownHook()
}

private val fileIndexKey = "amplitude.events.file.index.$apiKey"

private var os: FileOutputStream? = null

private var curFile: File? = null

private val mutex = Mutex()

companion object {
const val MAX_FILE_SIZE = 975_000 // 975KB
val writeMutex = Mutex()
val readMutex = Mutex()
bohan-amplitude marked this conversation as resolved.
Show resolved Hide resolved
val filePathSet: MutableSet<String> = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
val curFile: MutableMap<String, File> = ConcurrentHashMap<String, File>()
}

/**
* closes existing file, if at capacity
* opens a new file, if current file is full or uncreated
* stores the event
*/
suspend fun storeEvent(event: String) = mutex.withLock {
suspend fun storeEvent(event: String) = writeMutex.withLock {
var file = currentFile()
if (!file.exists()) {
// create it
file.createNewFile()
}

// check if file is at capacity
if (file.length() > MAX_FILE_SIZE) {
finish()
while (file.length() > MAX_FILE_SIZE) {
finish(file)
// update index
file = currentFile()
file.createNewFile()
if (!file.exists()) {
// create it
file.createNewFile()
}
}

var contents = ""
Expand All @@ -69,7 +71,7 @@ class EventsFileManager(
* Returns a comma-separated list of file paths that are not yet uploaded
*/
fun read(): List<String> {
// we need to filter out .temp file, since its operating on the writing thread
// we need to filter out .temp file, since it's operating on the writing thread
val fileList = directory.listFiles { _, name ->
name.contains(apiKey) && !name.endsWith(".tmp")
} ?: emptyArray()
Expand All @@ -82,6 +84,7 @@ class EventsFileManager(
* deletes the file at filePath
*/
fun remove(filePath: String): Boolean {
filePathSet.remove(filePath)
return File(filePath).delete()
}

Expand All @@ -95,8 +98,11 @@ class EventsFileManager(
* closes current file, and increase the index
* so next write go to a new file
*/
suspend fun rollover() = mutex.withLock {
finish()
suspend fun rollover() = writeMutex.withLock {
val file = currentFile()
if (file.exists() && file.length() > 0) {
finish(file)
}
}

/**
Expand All @@ -117,62 +123,64 @@ class EventsFileManager(
this.remove(filePath)
}

private fun finish() {
val file = currentFile()
if (!file.exists()) {
// if tmp file doesnt exist then we dont need to do anything
suspend fun getEventString(filePath: String): String = readMutex.withLock {
// Block one time of file reads if another task has read the content of this file
if (filePathSet.contains(filePath)) {
filePathSet.remove(filePath)
return ""
}
filePathSet.add(filePath)
File(filePath).bufferedReader().use {
return it.readText()
}
}

private fun finish(file: File?) {
if (file == null || !file.exists() || file.length() == 0L) {
// if tmp file doesn't exist or empty then we don't need to do anything
return
}
// close events array and batch object
val contents = """]"""
writeToFile(contents.toByteArray(), file)
file.renameTo(File(directory, file.nameWithoutExtension))
os?.close()
incrementFileIndex()
reset()
}

// return the current tmp file
private fun currentFile(): File {
curFile = curFile ?: run {
val index = kvs.getLong(fileIndexKey, 0)
File(directory, "$apiKey-$index.tmp")
}
val file = curFile[apiKey] ?: run {
// check leftover tmp file
val fileList = directory.listFiles { _, name ->
name.contains(apiKey) && name.endsWith(".tmp")
} ?: emptyArray()

return curFile!!
fileList.getOrNull(0)
}
val index = kvs.getLong(fileIndexKey, 0)
curFile[apiKey] = file ?: File(directory, "$apiKey-$index.tmp")
return curFile[apiKey]!!
}

// write to underlying file
private fun writeToFile(content: ByteArray, file: File) {
os = os ?: FileOutputStream(file, true)
os?.run {
write(content)
flush()
FileOutputStream(file, true).use {
it.write(content)
it.flush()
}
}

private fun writeToFile(content: String, file: File) {
file.createNewFile()
val fileOS = FileOutputStream(file, true)
fileOS.run {
write(content.toByteArray())
flush()
FileOutputStream(file).use {
it.write(content.toByteArray())
it.flush()
}
file.renameTo(File(directory, file.nameWithoutExtension))
fileOS.close()
}

private fun reset() {
os = null
curFile = null
}

private fun registerShutdownHook() {
// close the stream if the app shuts down
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
os?.close()
}
})
curFile.remove(apiKey)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.json.JSONArray
import org.json.JSONException

class FileResponseHandler(
private val storage: EventsFileStorage,
Expand All @@ -19,15 +20,29 @@ class FileResponseHandler(
) : ResponseHandler {

override fun handleSuccessResponse(successResponse: SuccessResponse) {
val events = JSONArray(eventsString).toEvents()
val events: List<BaseEvent>
try {
events = JSONArray(eventsString).toEvents()
} catch (e: JSONException) {
storage.removeFile(eventFilePath)
removeCallbackByInsertId(eventsString)
throw e
}
triggerEventsCallback(events, HttpStatus.SUCCESS.code, "Event sent success.")
scope.launch(dispatcher) {
storage.removeFile(eventFilePath)
}
}

override fun handleBadRequestResponse(badRequestResponse: BadRequestResponse) {
val events = JSONArray(eventsString).toEvents()
val events: List<BaseEvent>
try {
events = JSONArray(eventsString).toEvents()
} catch (e: JSONException) {
storage.removeFile(eventFilePath)
removeCallbackByInsertId(eventsString)
throw e
}
if (events.size == 1) {
triggerEventsCallback(events, HttpStatus.BAD_REQUEST.code, badRequestResponse.error)
storage.removeFile(eventFilePath)
Expand All @@ -53,7 +68,14 @@ class FileResponseHandler(
}

override fun handlePayloadTooLargeResponse(payloadTooLargeResponse: PayloadTooLargeResponse) {
val rawEvents = JSONArray(eventsString)
val rawEvents: JSONArray
try {
rawEvents = JSONArray(eventsString)
} catch (e: JSONException) {
storage.removeFile(eventFilePath)
removeCallbackByInsertId(eventsString)
throw e
}
if (rawEvents.length() == 1) {
val events = rawEvents.toEvents()
triggerEventsCallback(events, HttpStatus.PAYLOAD_TOO_LARGE.code, payloadTooLargeResponse.error)
Expand Down Expand Up @@ -93,4 +115,11 @@ class FileResponseHandler(
}
}
}

private fun removeCallbackByInsertId(eventsString: String) {
val regx = """"insert_id":"(.{36})",""".toRegex()
regx.findAll(eventsString).forEach {
storage.removeEventCallback(it.groupValues[1])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FileStorage(
return eventsFile.read()
}

override fun getEventsString(content: Any): String {
override suspend fun getEventsString(content: Any): String {
// content is filePath String
val bufferedReader: BufferedReader = File(content as String).bufferedReader()
bufferedReader.use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class InMemoryStorage(
return listOf(eventsToSend)
}

override fun getEventsString(content: Any): String {
override suspend fun getEventsString(content: Any): String {
// content is list of BaseEvent
return JSONUtil.eventsToString(content as List<BaseEvent>)
}
Expand Down