-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Stab to integrate current connector and extract identity storage #5
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
plugins { | ||
id 'org.jetbrains.kotlin.jvm' | ||
id 'java' | ||
} | ||
|
||
group 'com.amplitude' | ||
|
||
repositories { | ||
mavenCentral() | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
implementation "org.jetbrains.kotlin:kotlin-stdlib" | ||
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' | ||
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' | ||
} | ||
|
||
test { | ||
useJUnitPlatform() | ||
} | ||
|
||
apply plugin: 'idea' | ||
|
||
sourceSets.main.java.srcDirs = ['java'] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package com.amplitude | ||
|
||
class IMIdentityStorage: IdentityStorage { | ||
lateinit var identityStore: IdentityStore | ||
var userId: String? = null | ||
var deviceId: String? = null | ||
|
||
override fun setup(identityStore: IdentityStore) { | ||
this.identityStore = identityStore | ||
identityStore.addIdentityListener(IMIdentityListener(this)) | ||
load() | ||
} | ||
|
||
private fun load() { | ||
identityStore.setIdentity(Identity(userId, deviceId), IdentityUpdateType.Initialized) | ||
} | ||
|
||
override fun saveUserId(userId: String?) { | ||
this.userId = userId | ||
} | ||
|
||
override fun saveDeviceId(deviceId: String?) { | ||
this.deviceId = deviceId | ||
} | ||
} | ||
|
||
class IMIdentityStorageProvider: IdentityStorageProvider { | ||
override fun getIdentityStorage(): IdentityStorage { | ||
return IMIdentityStorage() | ||
} | ||
} | ||
|
||
class IMIdentityListener(private val identityStorage: IMIdentityStorage) : IdentityListener { | ||
|
||
override fun onUserIdChange(userId: String?) { | ||
identityStorage.saveUserId(userId) | ||
} | ||
|
||
override fun onDeviceIdChange(deviceId: String?) { | ||
identityStorage.saveDeviceId(deviceId) | ||
} | ||
|
||
override fun onIdentityChanged(identity: Identity, updateType: IdentityUpdateType) { | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package com.amplitude | ||
|
||
class IdContainer private constructor(val identityStorageProvider: IdentityStorageProvider) { | ||
var identityStorage: IdentityStorage | ||
var identityStore: IdentityStore | ||
|
||
companion object { | ||
|
||
private val instancesLock = Any() | ||
private val instances = mutableMapOf<String, IdContainer>() | ||
|
||
@JvmStatic | ||
fun getInstance(apiKey: String, identityStorageProvider: IdentityStorageProvider): IdContainer { | ||
return synchronized(instancesLock) { | ||
instances.getOrPut(apiKey) { | ||
IdContainer(identityStorageProvider) | ||
} | ||
} | ||
} | ||
} | ||
|
||
init { | ||
identityStore = IdentityStoreImpl() | ||
identityStorage = identityStorageProvider.getIdentityStorage() | ||
identityStorage.setup(identityStore) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
package com.amplitude | ||
|
||
import java.util.concurrent.locks.ReentrantReadWriteLock | ||
import kotlin.concurrent.read | ||
import kotlin.concurrent.write | ||
|
||
internal const val ID_OP_SET = "\$set" | ||
internal const val ID_OP_UNSET = "\$unset" | ||
internal const val ID_OP_CLEAR_ALL = "\$clearAll" | ||
|
||
interface IdentityListener { | ||
|
||
fun onUserIdChange(userId: String?) | ||
|
||
fun onDeviceIdChange(deviceId: String?) | ||
|
||
fun onIdentityChanged(identity: Identity, updateType: IdentityUpdateType) | ||
} | ||
|
||
data class Identity( | ||
val userId: String? = null, | ||
val deviceId: String? = null, | ||
val userProperties: Map<String, Any?> = mapOf(), | ||
) | ||
|
||
enum class IdentityUpdateType { | ||
Initialized, Updated | ||
} | ||
|
||
interface IdentityStore { | ||
|
||
interface Editor { | ||
|
||
fun setUserId(userId: String?): Editor | ||
fun setDeviceId(deviceId: String?): Editor | ||
fun setUserProperties(userProperties: Map<String, Any?>): Editor | ||
fun updateUserProperties(actions: Map<String, Map<String, Any?>>): Editor | ||
fun commit() | ||
} | ||
|
||
fun editIdentity(): Editor | ||
fun setIdentity(identity: Identity, updateType: IdentityUpdateType = IdentityUpdateType.Updated) | ||
fun getIdentity(): Identity | ||
fun addIdentityListener(listener: IdentityListener) | ||
fun removeIdentityListener(listener: IdentityListener) | ||
fun isInitialized(): Boolean | ||
} | ||
|
||
internal class IdentityStoreImpl: IdentityStore { | ||
|
||
private val identityLock = ReentrantReadWriteLock(true) | ||
private var identity = Identity() | ||
|
||
private val listenersLock = Any() | ||
private val listeners: MutableSet<IdentityListener> = mutableSetOf() | ||
private var initialized: Boolean = false | ||
|
||
override fun editIdentity(): IdentityStore.Editor { | ||
val originalIdentity = getIdentity() | ||
return object : IdentityStore.Editor { | ||
|
||
private var userId: String? = originalIdentity.userId | ||
private var deviceId: String? = originalIdentity.deviceId | ||
private var userProperties: Map<String, Any?> = originalIdentity.userProperties | ||
|
||
override fun setUserId(userId: String?): IdentityStore.Editor { | ||
this.userId = userId | ||
return this | ||
} | ||
|
||
override fun setDeviceId(deviceId: String?): IdentityStore.Editor { | ||
this.deviceId = deviceId | ||
return this | ||
} | ||
|
||
override fun setUserProperties(userProperties: Map<String, Any?>): IdentityStore.Editor { | ||
this.userProperties = userProperties | ||
return this | ||
} | ||
|
||
override fun updateUserProperties(actions: Map<String, Map<String, Any?>>): IdentityStore.Editor { | ||
val actingProperties = this.userProperties.toMutableMap() | ||
for (actionEntry in actions.entries) { | ||
val action = actionEntry.key | ||
val properties = actionEntry.value | ||
when (action) { | ||
ID_OP_SET -> { | ||
actingProperties.putAll(properties) | ||
} | ||
ID_OP_UNSET -> { | ||
for (entry in properties.entries) { | ||
actingProperties.remove(entry.key) | ||
} | ||
} | ||
ID_OP_CLEAR_ALL -> { | ||
actingProperties.clear() | ||
} | ||
} | ||
} | ||
this.userProperties = actingProperties | ||
return this | ||
} | ||
|
||
override fun commit() { | ||
val newIdentity = Identity(userId, deviceId, userProperties) | ||
setIdentity(newIdentity) | ||
} | ||
} | ||
} | ||
|
||
override fun setIdentity(identity: Identity, updateType: IdentityUpdateType) { | ||
val originalIdentity = getIdentity() | ||
identityLock.write { | ||
this.identity = identity | ||
if (updateType == IdentityUpdateType.Initialized) { | ||
initialized = true | ||
} | ||
} | ||
if (identity != originalIdentity) { | ||
val safeListeners = synchronized(listenersLock) { | ||
listeners.toSet() | ||
} | ||
|
||
for (listener in safeListeners) { | ||
if (identity.userId != originalIdentity.userId) { | ||
listener.onUserIdChange(identity.userId) | ||
} | ||
if (identity.deviceId != originalIdentity.deviceId) { | ||
listener.onDeviceIdChange(identity.deviceId) | ||
} | ||
listener.onIdentityChanged(identity, updateType) | ||
} | ||
} | ||
} | ||
|
||
override fun getIdentity(): Identity { | ||
return identityLock.read { | ||
this.identity | ||
} | ||
} | ||
|
||
override fun addIdentityListener(listener: IdentityListener) { | ||
synchronized(listenersLock) { | ||
listeners.add(listener) | ||
} | ||
} | ||
|
||
override fun removeIdentityListener(listener: IdentityListener) { | ||
synchronized(listenersLock) { | ||
listeners.remove(listener) | ||
} | ||
} | ||
|
||
override fun isInitialized(): Boolean { | ||
return initialized | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package com.amplitude | ||
|
||
interface IdentityStorage { | ||
|
||
fun setup(identityStore: IdentityStore) | ||
|
||
fun saveUserId(userId: String?) | ||
|
||
fun saveDeviceId(deviceId: String?) | ||
} | ||
|
||
interface IdentityStorageProvider { | ||
fun getIdentityStorage(): IdentityStorage | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,12 @@ package com.amplitude | |
import com.amplitude.events.BaseEvent | ||
import com.amplitude.events.Identify | ||
import com.amplitude.events.Revenue | ||
import com.amplitude.platform.ObservePlugin | ||
import com.amplitude.platform.Plugin | ||
import com.amplitude.platform.Timeline | ||
import com.amplitude.platform.plugins.AmplitudeDestination | ||
import com.amplitude.platform.plugins.ContextPlugin | ||
import com.amplitude.utilities.AnalyticsIdentityListener | ||
import kotlinx.coroutines.* | ||
import java.util.concurrent.Executors | ||
|
||
|
@@ -22,12 +24,15 @@ open class Amplitude internal constructor( | |
internal val timeline: Timeline | ||
val storage: com.amplitude.Storage | ||
val logger: com.amplitude.Logger | ||
val idContainer: IdContainer | ||
|
||
init { | ||
require(configuration.isValid()) { "invalid configuration" } | ||
timeline = Timeline().also { it.amplitude = this } | ||
storage = configuration.storageProvider.getStorage(this) | ||
logger = configuration.loggerProvider.getLogger(this) | ||
idContainer = IdContainer.getInstance(configuration.apiKey, IMIdentityStorageProvider()) | ||
idContainer.identityStore.addIdentityListener(AnalyticsIdentityListener(store)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question, does this make amplitude client listen for changes happening outside? while this emits changes happening inside amplitude client? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, this connects to identity store, then we can have support plugins for changes happening outside |
||
build() | ||
} | ||
|
||
|
@@ -59,7 +64,7 @@ open class Amplitude internal constructor( | |
} | ||
|
||
fun identify(userId: String) { | ||
|
||
this.idContainer.identityStore.editIdentity().setUserId(userId).commit() | ||
} | ||
|
||
fun groupIdentify(identify: Identify) { | ||
|
@@ -86,12 +91,26 @@ open class Amplitude internal constructor( | |
} | ||
|
||
fun add(plugin: Plugin) : com.amplitude.Amplitude { | ||
this.timeline.add(plugin) | ||
when (plugin) { | ||
is ObservePlugin -> { | ||
this.store.add(plugin) | ||
} | ||
else -> { | ||
this.timeline.add(plugin) | ||
} | ||
} | ||
return this | ||
} | ||
|
||
fun remove(plugin: Plugin): com.amplitude.Amplitude { | ||
this.timeline.remove(plugin) | ||
when (plugin) { | ||
is ObservePlugin -> { | ||
this.store.remove(plugin) | ||
} | ||
else -> { | ||
this.timeline.remove(plugin) | ||
} | ||
} | ||
return this | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,30 @@ | ||
package com.amplitude | ||
|
||
import com.amplitude.platform.ObservePlugin | ||
|
||
class State { | ||
var userId: String? = null | ||
set(value: String?) { | ||
userId = value | ||
plugins.forEach { plugin -> | ||
plugin.onUserIdChanged(value) | ||
} | ||
} | ||
|
||
var deviceId: String? = null | ||
set(value: String?) { | ||
deviceId = value | ||
plugins.forEach { plugin -> | ||
plugin.onDeviceIdChanged(value) | ||
} | ||
} | ||
val plugins: MutableList<ObservePlugin> = mutableListOf() | ||
|
||
fun add(plugin: ObservePlugin) = synchronized(plugins) { | ||
plugins.add(plugin) | ||
} | ||
|
||
fun remove(plugin: ObservePlugin) = synchronized(plugins) { | ||
plugins.removeAll { it === plugin } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious what this does and what happens if not all listeners are returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the toSet is just to reduce the duplicate listeners