Skip to content

Commit

Permalink
fix: revert recent session changes from 1.16.3 to 1.16.6 (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
justin-fiedler authored Mar 28, 2024
1 parent 16e7629 commit 8d7271f
Show file tree
Hide file tree
Showing 20 changed files with 285 additions and 844 deletions.
47 changes: 5 additions & 42 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import com.amplitude.android.plugins.AnalyticsConnectorPlugin
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.android.plugins.AndroidNetworkConnectivityCheckerPlugin
import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
Expand All @@ -25,22 +23,17 @@ open class Amplitude(
internal var inForeground = false
private lateinit var androidContextPlugin: AndroidContextPlugin

private var _initialSessionId = Session.EMPTY_SESSION_ID
val sessionId: Long
get() {
return try {
(timeline as Timeline).sessionId
} catch (e: Throwable) {
_initialSessionId
}
return (timeline as Timeline).sessionId
}

init {
registerShutdownHook()
}

override fun createTimeline(): Timeline {
return Timeline().also { it.amplitude = this }
return Timeline(configuration.sessionId).also { it.amplitude = this }
}

override fun createIdentityConfiguration(): IdentityConfiguration {
Expand All @@ -56,33 +49,12 @@ open class Amplitude(
)
}

private suspend fun loadInitialSessionId(startTime: Long) {
// Load in existing session info
val session = Session(configuration as Configuration, storage, store)

// disconnect from storages
session.storage = null
session.state = null

// Get the next session id without changing storage values
session.startNewSessionIfNeeded(startTime, configuration.sessionId)

// Set the initial sessionId before plugins are setup
_initialSessionId = session.sessionId
}

override suspend fun buildInternal(identityConfiguration: IdentityConfiguration) {
val startTime = SystemTime.getCurrentTimeMillis()

// Set the initial sessionId before plugins are setup
loadInitialSessionId(startTime)

// Migrations
ApiKeyStorageMigration(this).execute()
if ((configuration as Configuration).migrateLegacyData) {

if ((this.configuration as Configuration).migrateLegacyData) {
RemnantDataMigration(this).execute()
}

this.createIdentityContainer(identityConfiguration)

if (this.configuration.offline != AndroidNetworkConnectivityCheckerPlugin.Disabled) {
Expand All @@ -101,16 +73,7 @@ open class Amplitude(
add(AnalyticsConnectorPlugin())
add(AmplitudeDestination())

// Add user plugins from config
val plugins = configuration.plugins
if (plugins != null) {
for (plugin in plugins) {
add(plugin)
}
}

// Start session before adding plugins
(timeline as Timeline).start(startTime)
(timeline as Timeline).start()
}

/**
Expand Down
3 changes: 0 additions & 3 deletions android/src/main/java/com/amplitude/android/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.amplitude.core.ServerZone
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.IngestionMetadata
import com.amplitude.core.events.Plan
import com.amplitude.core.platform.Plugin
import com.amplitude.id.FileIdentityStorageProvider
import com.amplitude.id.IdentityStorageProvider

Expand Down Expand Up @@ -50,7 +49,6 @@ open class Configuration @JvmOverloads constructor(
override var offline: Boolean? = false,
override var deviceId: String? = null,
override var sessionId: Long? = null,
override var plugins: List<Plugin>? = null,
) : Configuration(
apiKey,
flushQueueSize,
Expand All @@ -74,7 +72,6 @@ open class Configuration @JvmOverloads constructor(
offline,
deviceId,
sessionId,
plugins
) {
companion object {
const val MIN_TIME_BETWEEN_SESSIONS_MILLIS: Long = 300000
Expand Down
162 changes: 98 additions & 64 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -1,59 +1,42 @@
package com.amplitude.android

import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicLong

class Timeline : Timeline() {
companion object {
const val DEFAULT_LAST_EVENT_ID = 0L
}

class Timeline(
private val initialSessionId: Long? = null,
) : Timeline() {
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
internal lateinit var session: Session

private val _lastEventId = AtomicLong(DEFAULT_LAST_EVENT_ID)

internal var lastEventId: Long = DEFAULT_LAST_EVENT_ID
get() = _lastEventId.get()

internal var sessionId: Long = Session.EMPTY_SESSION_ID
get() = if (session == null) Session.EMPTY_SESSION_ID else session.sessionId

internal suspend fun start(timestamp: Long? = null) {
this.session = Session(
amplitude.configuration as Configuration,
amplitude.storage,
amplitude.store
)

val sessionEvents = session.startNewSessionIfNeeded(
timestamp ?: SystemTime.getCurrentTimeMillis(),
amplitude.configuration.sessionId
)
private val _sessionId = AtomicLong(initialSessionId ?: -1L)
val sessionId: Long
get() {
return _sessionId.get()
}

loadLastEventId()
internal var lastEventId: Long = 0
var lastEventTime: Long = -1L

internal fun start() {
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
// Wait until build (including possible legacy data migration) is finished.
amplitude.isBuilt.await()

for (message in eventMessageChannel) {
processEventMessage(message)
if (initialSessionId == null) {
_sessionId.set(
amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
?: -1
)
}
}
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() ?: -1

if (!amplitude.configuration.optOut) {
runBlocking {
sessionEvents?.forEach {
processImmediately(it)
}
for (message in eventMessageChannel) {
processEventMessage(message)
}
}
}
Expand All @@ -64,58 +47,66 @@ class Timeline : Timeline() {

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
incomingEvent.timestamp = System.currentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processImmediately(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

processEventMessage(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
val eventTimestamp = event.timestamp!!
val eventSessionId = event.sessionId
var skipEvent = false

if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
if (event.eventType == Amplitude.START_SESSION_EVENT) {
setSessionId(eventSessionId ?: eventTimestamp)
refreshSessionTime(eventTimestamp)
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
skipEvent = true
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else if (event.eventType == Amplitude.DUMMY_EXIT_FOREGROUND_EVENT) {
skipEvent = true
session.refreshSessionTime(eventTimestamp)
refreshSessionTime(eventTimestamp)
} else {
if (!message.inForeground) {
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
session.refreshSessionTime(eventTimestamp)
refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId == null) {
event.sessionId = session.sessionId
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents?.let {
it.forEach { e ->
e.eventId ?: let {
e.eventId = getAndSetNextEventId()
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

if (!skipEvent) {
event.eventId ?: let {
event.eventId = getAndSetNextEventId()
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

sessionEvents?.let {
it.forEach { e ->
super.process(e)
Expand All @@ -127,21 +118,64 @@ class Timeline : Timeline() {
}
}

private fun loadLastEventId() {
val lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()
?: DEFAULT_LAST_EVENT_ID
_lastEventId.set(lastEventId)
private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
}

private suspend fun setSessionId(timestamp: Long) {
_sessionId.set(timestamp)
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}

private suspend fun writeLastEventId(lastEventId: Long) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val configuration = amplitude.configuration as Configuration
// If any trackingSessionEvents is false (default value is true), means it is manually set
@Suppress("DEPRECATION")
val trackingSessionEvents = configuration.trackingSessionEvents && configuration.defaultTracking.sessions

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun getAndSetNextEventId(): Long {
val nextEventId = _lastEventId.incrementAndGet()
writeLastEventId(nextEventId)
private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

return nextEventId
private fun inSession(): Boolean {
return sessionId >= 0
}
}

Expand Down
Loading

0 comments on commit 8d7271f

Please sign in to comment.