Skip to content

Commit

Permalink
feat: Fix retain cycles in Amplitude so instance will not leak memory (
Browse files Browse the repository at this point in the history
  • Loading branch information
crleona authored May 9, 2024
1 parent 83d24f7 commit 492d1fd
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 75 deletions.
60 changes: 32 additions & 28 deletions Sources/Amplitude/Migration/RemnantDataMigration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ class RemnantDataMigration {
private static let PREVIOUS_SESSION_TIME_KEY = "previous_session_time"
private static let PREVIOUS_SESSION_ID_KEY = "previous_session_id"

private let amplitude: Amplitude
private let storage: LegacyDatabaseStorage
private let logger: (any Logger)?
private let storage: Storage
private let identifyStorage: Storage
private let legacyStorage: LegacyDatabaseStorage

init(_ amplitude: Amplitude) {
self.amplitude = amplitude
self.storage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger)
logger = amplitude.logger
storage = amplitude.storage
identifyStorage = amplitude.identifyStorage
legacyStorage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger)
}

func execute() {
let firstRunSinceUpgrade = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME) == nil
let firstRunSinceUpgrade = storage.read(key: StorageKey.LAST_EVENT_TIME) == nil

moveDeviceAndUserId()
moveSessionData()
Expand All @@ -31,52 +35,52 @@ class RemnantDataMigration {
}

if maxEventId > 0 {
let currentLastEventId: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_ID)
let currentLastEventId: Int64? = storage.read(key: StorageKey.LAST_EVENT_ID)
if currentLastEventId == nil || currentLastEventId! <= 0 {
try? amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId)
try? storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId)
}
}
}

private func moveDeviceAndUserId() {
let currentDeviceId: String? = amplitude.storage.read(key: StorageKey.DEVICE_ID)
let currentDeviceId: String? = storage.read(key: StorageKey.DEVICE_ID)
if currentDeviceId == nil || currentDeviceId! == "" {
if let deviceId = storage.getValue(RemnantDataMigration.DEVICE_ID_KEY) {
try? amplitude.storage.write(key: StorageKey.DEVICE_ID, value: deviceId)
if let deviceId = legacyStorage.getValue(RemnantDataMigration.DEVICE_ID_KEY) {
try? storage.write(key: StorageKey.DEVICE_ID, value: deviceId)
}
}

let currentUserId: String? = amplitude.storage.read(key: StorageKey.USER_ID)
let currentUserId: String? = storage.read(key: StorageKey.USER_ID)
if currentUserId == nil || currentUserId == "" {
if let userId = storage.getValue(RemnantDataMigration.USER_ID_KEY) {
try? amplitude.storage.write(key: StorageKey.USER_ID, value: userId)
if let userId = legacyStorage.getValue(RemnantDataMigration.USER_ID_KEY) {
try? storage.write(key: StorageKey.USER_ID, value: userId)
}
}
}

private func moveSessionData() {
let currentSessionId: Int64? = amplitude.storage.read(key: StorageKey.PREVIOUS_SESSION_ID)
let currentLastEventTime: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME)
let currentSessionId: Int64? = storage.read(key: StorageKey.PREVIOUS_SESSION_ID)
let currentLastEventTime: Int64? = storage.read(key: StorageKey.LAST_EVENT_TIME)

let previousSessionId = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
let lastEventTime = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
let previousSessionId = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
let lastEventTime = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)

if (currentSessionId == nil || currentSessionId! < 0) && previousSessionId != nil && previousSessionId! >= 0 {
try? amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId)
storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
try? storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId)
legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
}

if (currentLastEventTime == nil || currentLastEventTime! < 0) && lastEventTime != nil && lastEventTime! >= 0 {
try? amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime)
storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
try? storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime)
legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
}
}

private func moveEvents() -> Int64 {
var maxEventId: Int64 = -1
let remnantEvents = storage.readEvents()
let remnantEvents = legacyStorage.readEvents()
remnantEvents.forEach { event in
let eventId = moveEvent(event, amplitude.storage, storage.removeEvent)
let eventId = moveEvent(event, storage, legacyStorage.removeEvent)
if maxEventId < eventId {
maxEventId = eventId
}
Expand All @@ -86,9 +90,9 @@ class RemnantDataMigration {

private func moveIdentifies() -> Int64 {
var maxEventId: Int64 = -1
let remnantEvents = storage.readIdentifies()
let remnantEvents = legacyStorage.readIdentifies()
remnantEvents.forEach { event in
let eventId = moveEvent(event, amplitude.storage, storage.removeIdentify)
let eventId = moveEvent(event, storage, legacyStorage.removeIdentify)
if maxEventId < eventId {
maxEventId = eventId
}
Expand All @@ -97,9 +101,9 @@ class RemnantDataMigration {
}

private func moveInterceptedIdentifies() {
let remnantEvents = storage.readInterceptedIdentifies()
let remnantEvents = legacyStorage.readInterceptedIdentifies()
remnantEvents.forEach { event in
_ = moveEvent(event, amplitude.identifyStorage, storage.removeInterceptedIdentify)
_ = moveEvent(event, identifyStorage, legacyStorage.removeInterceptedIdentify)
}
}

Expand All @@ -113,7 +117,7 @@ class RemnantDataMigration {
removeFromSource(rowId!)
return rowId!
} catch {
amplitude.logger?.error(message: "event migration failed: \(error)")
logger?.error(message: "event migration failed: \(error)")
return -1
}
}
Expand Down
17 changes: 11 additions & 6 deletions Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ class AnalyticsConnectorPlugin: BeforePlugin {
override func setup(amplitude: Amplitude) {
super.setup(amplitude: amplitude)
connector = AnalyticsConnector.getInstance(amplitude.configuration.instanceName)
connector!.eventBridge.setEventReceiver { event in
amplitude.track(event: BaseEvent(
eventType: event.eventType,
eventProperties: event.eventProperties as? [String: Any],
userProperties: event.userProperties as? [String: Any]
))
let logger = amplitude.logger
connector!.eventBridge.setEventReceiver { [weak amplitude, logger] event in
if let amplitude = amplitude {
amplitude.track(event: BaseEvent(
eventType: event.eventType,
eventProperties: event.eventProperties as? [String: Any],
userProperties: event.userProperties as? [String: Any]
))
} else {
logger?.error(message: "Amplitude instance has been deallocated, please maintain a strong reference to track events from Experiment")
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ open class NetworkConnectivityCheckerPlugin: BeforePlugin {
amplitude.logger?.debug(message: "Installing NetworkConnectivityCheckerPlugin, offline feature should be supported.")

pathCreation.start(queue: amplitude.trackingQueue)
let logger = amplitude.logger
pathUpdateCancellable = pathCreation.networkPathPublisher?
.sink(receiveValue: { [weak self] networkPath in
.sink(receiveValue: { [weak amplitude, logger] networkPath in
guard let amplitude = amplitude else {
logger?.debug(message: "Received network connectivity updated when amplitude instance has been deallocated")
return
}
let isOffline = !(networkPath.status == .satisfied)
if self?.amplitude?.configuration.offline == isOffline {
if amplitude.configuration.offline == isOffline {
return
}
self?.amplitude?.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").")
self?.amplitude?.configuration.offline = isOffline
amplitude.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").")
amplitude.configuration.offline = isOffline
if !isOffline {
amplitude.flush()
}
Expand Down
27 changes: 15 additions & 12 deletions Sources/Amplitude/Sessions.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import Foundation

public class Sessions {
private let amplitude: Amplitude

private let configuration: Configuration
private let storage: Storage
private let logger: (any Logger)?
private var _sessionId: Int64 = -1
private(set) var sessionId: Int64 {
get { _sessionId }
set {
_sessionId = newValue
do {
try amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId)
try storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId)
} catch {
amplitude.logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)")
logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)")
}
}
}
Expand All @@ -22,9 +23,9 @@ public class Sessions {
set {
_lastEventId = newValue
do {
try amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId)
try storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId)
} catch {
amplitude.logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)")
logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)")
}
}
}
Expand All @@ -35,15 +36,17 @@ public class Sessions {
set {
_lastEventTime = newValue
do {
try amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime)
try storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime)
} catch {
amplitude.logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)")
logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)")
}
}
}

init(amplitude: Amplitude) {
self.amplitude = amplitude
configuration = amplitude.configuration
storage = amplitude.storage
logger = amplitude.logger
self._sessionId = amplitude.storage.read(key: .PREVIOUS_SESSION_ID) ?? -1
self._lastEventId = amplitude.storage.read(key: .LAST_EVENT_ID) ?? 0
self._lastEventTime = amplitude.storage.read(key: .LAST_EVENT_TIME) ?? -1
Expand Down Expand Up @@ -101,7 +104,7 @@ public class Sessions {

private func isWithinMinTimeBetweenSessions(timestamp: Int64) -> Bool {
let timeDelta = timestamp - self.lastEventTime
return timeDelta < amplitude.configuration.minTimeBetweenSessionsMillis
return timeDelta < configuration.minTimeBetweenSessionsMillis
}

public func startNewSessionIfNeeded(timestamp: Int64, inForeground: Bool) -> [BaseEvent]? {
Expand All @@ -116,7 +119,7 @@ public class Sessions {

public func startNewSession(timestamp: Int64) -> [BaseEvent] {
var sessionEvents: [BaseEvent] = Array()
let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions
let trackingSessionEvents = configuration.defaultTracking.sessions

// end previous session
if trackingSessionEvents && self.sessionId >= 0 {
Expand Down Expand Up @@ -145,7 +148,7 @@ public class Sessions {

public func endCurrentSession() -> [BaseEvent] {
var sessionEvents: [BaseEvent] = Array()
let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions
let trackingSessionEvents = configuration.defaultTracking.sessions

if trackingSessionEvents && self.sessionId >= 0 {
let sessionEndEvent = BaseEvent(
Expand Down
2 changes: 1 addition & 1 deletion Sources/Amplitude/Utilities/DefaultEventUtils.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Foundation

public class DefaultEventUtils {
private var amplitude: Amplitude?
private weak var amplitude: Amplitude?

public init(amplitude: Amplitude) {
self.amplitude = amplitude
Expand Down
39 changes: 20 additions & 19 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import Foundation

public class EventPipeline {
let amplitude: Amplitude
var httpClient: HttpClient
var storage: Storage? { amplitude.storage }
let storage: Storage?
let logger: (any Logger)?
let configuration: Configuration

@Atomic internal var eventCount: Int = 0
internal var flushTimer: QueueTimer?
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com")
Expand All @@ -22,11 +24,13 @@ public class EventPipeline {
private var uploads = [URL: UploadTaskInfo]()

init(amplitude: Amplitude) {
self.amplitude = amplitude
self.httpClient = HttpClient(configuration: amplitude.configuration,
diagnostics: amplitude.configuration.diagonostics,
callbackQueue: amplitude.trackingQueue)
self.flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in
storage = amplitude.storage
logger = amplitude.logger
configuration = amplitude.configuration
httpClient = HttpClient(configuration: amplitude.configuration,
diagnostics: amplitude.configuration.diagonostics,
callbackQueue: amplitude.trackingQueue)
flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in
self?.flush()
}
}
Expand All @@ -42,37 +46,34 @@ public class EventPipeline {
}
completion?()
} catch {
amplitude.logger?.error(message: "Error when storing event: \(error.localizedDescription)")
logger?.error(message: "Error when storing event: \(error.localizedDescription)")
}
}

func flush(completion: (() -> Void)? = nil) {
if self.amplitude.configuration.offline == true {
self.amplitude.logger?.debug(message: "Skipping flush while offline.")
if configuration.offline == true {
logger?.debug(message: "Skipping flush while offline.")
return
}

amplitude.logger?.log(message: "Start flushing \(eventCount) events")
logger?.log(message: "Start flushing \(eventCount) events")
eventCount = 0
guard let storage = self.storage else { return }
storage.rollover()
guard let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS) else { return }
for eventFile in eventFiles {
uploadsQueue.sync {
guard uploads[eventFile] == nil else {
amplitude.logger?.log(message: "Existing upload in progress, skipping...")
logger?.log(message: "Existing upload in progress, skipping...")
return
}
guard let eventsString = storage.getEventsString(eventBlock: eventFile),
!eventsString.isEmpty else {
return
}
let uploadTask = httpClient.upload(events: eventsString) { [weak self] result in
guard let self else {
return
}
let uploadTask = httpClient.upload(events: eventsString) { [self] result in
let responseHandler = storage.getResponseHandler(
configuration: self.amplitude.configuration,
configuration: self.configuration,
eventPipeline: self,
eventBlock: eventFile,
eventsString: eventsString
Expand All @@ -97,11 +98,11 @@ public class EventPipeline {
}

private func getFlushInterval() -> TimeInterval {
return TimeInterval.milliseconds(amplitude.configuration.flushIntervalMillis)
return TimeInterval.milliseconds(configuration.flushIntervalMillis)
}

private func getFlushCount() -> Int {
let count = amplitude.configuration.flushQueueSize
let count = configuration.flushQueueSize
return count != 0 ? count : 1
}
}
Expand Down
Loading

0 comments on commit 492d1fd

Please sign in to comment.