Skip to content

Commit

Permalink
fix: start tracking sessions at init for session replay (#186)
Browse files Browse the repository at this point in the history
* fix: start tracking sessions at init for session replay

* fix: support adding plugins to initial config

* chore: centralize time to use SystemTime for easier testing

* chore: test sessionId is the time of instantiation after isBuilt

* chore: added mockSystemTime() util for testing

* fix: add tests for ObservePlugin.onSessionIdChanged

* fix: override amplitude by default in ObservePlugin

* chore: remove unneeded session logic in Timeline.process
  • Loading branch information
justin-fiedler authored Mar 25, 2024
1 parent 5927f51 commit 7b69897
Show file tree
Hide file tree
Showing 21 changed files with 777 additions and 280 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/pull-request-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ jobs:
java-version: '11'
distribution: 'temurin'
cache: 'gradle'

- name: Build
run: ./gradlew build

- name: Unit Test
run: ./gradlew testDebugUnitTest

- name: Lint
run: ./gradlew ktlintCheck

- name: Upload build results
if: always()
uses: actions/upload-artifact@v4
Expand Down
20 changes: 16 additions & 4 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.amplitude.android.plugins.AnalyticsConnectorPlugin
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.android.plugins.AndroidNetworkConnectivityCheckerPlugin
import com.amplitude.android.utilities.Session
import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
Expand All @@ -25,15 +26,16 @@ open class Amplitude(

val sessionId: Long
get() {
return (timeline as Timeline).sessionId
return if (timeline == null) Session.EMPTY_SESSION_ID
else (timeline as Timeline).sessionId
}

init {
registerShutdownHook()
}

override fun createTimeline(): Timeline {
return Timeline(configuration.sessionId).also { it.amplitude = this }
return Timeline().also { it.amplitude = this }
}

override fun createIdentityConfiguration(): IdentityConfiguration {
Expand All @@ -50,11 +52,12 @@ open class Amplitude(
}

override suspend fun buildInternal(identityConfiguration: IdentityConfiguration) {
// Migrations
ApiKeyStorageMigration(this).execute()

if ((this.configuration as Configuration).migrateLegacyData) {
RemnantDataMigration(this).execute()
}

this.createIdentityContainer(identityConfiguration)

if (this.configuration.offline != AndroidNetworkConnectivityCheckerPlugin.Disabled) {
Expand All @@ -73,7 +76,16 @@ open class Amplitude(
add(AnalyticsConnectorPlugin())
add(AmplitudeDestination())

(timeline as Timeline).start()
// Add user plugins from config
val plugins = configuration.plugins
if (plugins != null) {
for (plugin in plugins) {
add(plugin)
}
}

val androidTimeline = timeline as Timeline
androidTimeline.start()
}

/**
Expand Down
3 changes: 3 additions & 0 deletions android/src/main/java/com/amplitude/android/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amplitude.core.ServerZone
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.IngestionMetadata
import com.amplitude.core.events.Plan
import com.amplitude.core.platform.Plugin
import com.amplitude.id.FileIdentityStorageProvider
import com.amplitude.id.IdentityStorageProvider

Expand Down Expand Up @@ -49,6 +50,7 @@ open class Configuration @JvmOverloads constructor(
override var offline: Boolean? = false,
override var deviceId: String? = null,
override var sessionId: Long? = null,
override var plugins: List<Plugin>? = null,
) : Configuration(
apiKey,
flushQueueSize,
Expand All @@ -72,6 +74,7 @@ open class Configuration @JvmOverloads constructor(
offline,
deviceId,
sessionId,
plugins
) {
companion object {
const val MIN_TIME_BETWEEN_SESSIONS_MILLIS: Long = 300000
Expand Down
160 changes: 62 additions & 98 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -1,44 +1,59 @@
package com.amplitude.android

import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicLong

class Timeline(
private val initialSessionId: Long? = null,
) : Timeline() {
class Timeline : Timeline() {
companion object {
const val DEFAULT_LAST_EVENT_ID = 0L
}

private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
internal lateinit var session: Session

private val _sessionId = AtomicLong(initialSessionId ?: -1L)
val sessionId: Long
get() {
return _sessionId.get()
}
private val _lastEventId = AtomicLong(DEFAULT_LAST_EVENT_ID)

internal var lastEventId: Long = DEFAULT_LAST_EVENT_ID
get() = _lastEventId.get()

internal var lastEventId: Long = 0
var lastEventTime: Long = -1L
internal var sessionId: Long = Session.EMPTY_SESSION_ID
get() = if (session == null) Session.EMPTY_SESSION_ID else session.sessionId

internal suspend fun start() {
this.session = Session(
amplitude.configuration as Configuration,
amplitude.storage,
amplitude.store
)

val sessionEvents = session.startNewSessionIfNeeded(
SystemTime.getCurrentTimeMillis(),
amplitude.configuration.sessionId
)

loadLastEventId()

internal fun start() {
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
// Wait until build (including possible legacy data migration) is finished.
amplitude.isBuilt.await()

if (initialSessionId == null) {
_sessionId.set(
amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
?: -1
)
}
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() ?: -1

for (message in eventMessageChannel) {
processEventMessage(message)
}
}

runBlocking {
sessionEvents?.forEach {
processImmediately(it)
}
}
}

internal fun stop() {
Expand All @@ -47,66 +62,58 @@ class Timeline(

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = System.currentTimeMillis()
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processImmediately(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

processEventMessage(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
val eventTimestamp = event.timestamp!!
val eventSessionId = event.sessionId
var skipEvent = false

if (event.eventType == Amplitude.START_SESSION_EVENT) {
setSessionId(eventSessionId ?: eventTimestamp)
refreshSessionTime(eventTimestamp)
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else if (event.eventType == Amplitude.DUMMY_EXIT_FOREGROUND_EVENT) {
skipEvent = true
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
} else {
if (!message.inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId == null) {
event.sessionId = sessionId
event.sessionId = session.sessionId
}

val savedLastEventId = lastEventId

sessionEvents?.let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
e.eventId = getAndSetNextEventId()
}
}
}

if (!skipEvent) {
event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
event.eventId = getAndSetNextEventId()
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

sessionEvents?.let {
it.forEach { e ->
super.process(e)
Expand All @@ -118,64 +125,21 @@ class Timeline(
}
}

private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
private fun loadLastEventId() {
val lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()
?: DEFAULT_LAST_EVENT_ID
_lastEventId.set(lastEventId)
}

private suspend fun setSessionId(timestamp: Long) {
_sessionId.set(timestamp)
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
private suspend fun writeLastEventId(lastEventId: Long) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val configuration = amplitude.configuration as Configuration
// If any trackingSessionEvents is false (default value is true), means it is manually set
@Suppress("DEPRECATION")
val trackingSessionEvents = configuration.trackingSessionEvents && configuration.defaultTracking.sessions

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}
private suspend fun getAndSetNextEventId(): Long {
val nextEventId = _lastEventId.incrementAndGet()
writeLastEventId(nextEventId)

private fun inSession(): Boolean {
return sessionId >= 0
return nextEventId
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ class RemnantDataMigration(
companion object {
const val DEVICE_ID_KEY = "device_id"
const val USER_ID_KEY = "user_id"
const val LAST_EVENT_TIME_KEY = "last_event_time"
const val LAST_EVENT_ID_KEY = "last_event_id"
const val PREVIOUS_SESSION_ID_KEY = "previous_session_id"
}

lateinit var databaseStorage: DatabaseStorage
Expand All @@ -33,8 +31,9 @@ class RemnantDataMigration(

val firstRunSinceUpgrade = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() == null

// WARNING: We don't migrate session data as we want to reset on a new app install
moveDeviceAndUserId()
moveSessionData()
moveTimelineData()

if (firstRunSinceUpgrade) {
moveInterceptedIdentifies()
Expand Down Expand Up @@ -67,33 +66,19 @@ class RemnantDataMigration(
}
}

private suspend fun moveSessionData() {
private suspend fun moveTimelineData() {
try {
val currentSessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
val currentLastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull()
val currentLastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()

val previousSessionId = databaseStorage.getLongValue(PREVIOUS_SESSION_ID_KEY)
val lastEventTime = databaseStorage.getLongValue(LAST_EVENT_TIME_KEY)
val lastEventId = databaseStorage.getLongValue(LAST_EVENT_ID_KEY)

if (currentSessionId == null && previousSessionId != null) {
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, previousSessionId.toString())
databaseStorage.removeLongValue(PREVIOUS_SESSION_ID_KEY)
}

if (currentLastEventTime == null && lastEventTime != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
databaseStorage.removeLongValue(LAST_EVENT_TIME_KEY)
}

if (currentLastEventId == null && lastEventId != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
databaseStorage.removeLongValue(LAST_EVENT_ID_KEY)
}
} catch (e: Exception) {
LogcatLogger.logger.error(
"session data migration failed: ${e.message}"
"timeline data migration failed: ${e.message}"
)
}
}
Expand Down
Loading

0 comments on commit 7b69897

Please sign in to comment.