From 4a44832bad9199c9c282e58bcd603f7d223a962f Mon Sep 17 00:00:00 2001 From: Chris Leonavicius Date: Thu, 9 May 2024 11:01:58 -0700 Subject: [PATCH] feat: Fix retain cycles in Amplitude so instance will not leak memory --- .../Migration/RemnantDataMigration.swift | 60 ++++++++++--------- .../Plugins/AnalyticsConnectorPlugin.swift | 17 ++++-- .../NetworkConnectivityCheckerPlugin.swift | 13 ++-- Sources/Amplitude/Sessions.swift | 27 +++++---- .../Utilities/DefaultEventUtils.swift | 2 +- .../Amplitude/Utilities/EventPipeline.swift | 39 ++++++------ Tests/AmplitudeTests/AmplitudeTests.swift | 45 ++++++++++++++ .../Utilities/EventPipelineTests.swift | 10 ++-- 8 files changed, 138 insertions(+), 75 deletions(-) diff --git a/Sources/Amplitude/Migration/RemnantDataMigration.swift b/Sources/Amplitude/Migration/RemnantDataMigration.swift index f2aa0945..6de6208d 100644 --- a/Sources/Amplitude/Migration/RemnantDataMigration.swift +++ b/Sources/Amplitude/Migration/RemnantDataMigration.swift @@ -6,16 +6,20 @@ class RemnantDataMigration { private static let PREVIOUS_SESSION_TIME_KEY = "previous_session_time" private static let PREVIOUS_SESSION_ID_KEY = "previous_session_id" - private let amplitude: Amplitude - private let storage: LegacyDatabaseStorage + private let logger: (any Logger)? + private let storage: Storage + private let identifyStorage: Storage + private let legacyStorage: LegacyDatabaseStorage init(_ amplitude: Amplitude) { - self.amplitude = amplitude - self.storage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger) + logger = amplitude.logger + storage = amplitude.storage + identifyStorage = amplitude.identifyStorage + legacyStorage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger) } func execute() { - let firstRunSinceUpgrade = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME) == nil + let firstRunSinceUpgrade = storage.read(key: StorageKey.LAST_EVENT_TIME) == nil moveDeviceAndUserId() moveSessionData() @@ -31,52 +35,52 @@ class RemnantDataMigration { } if maxEventId > 0 { - let currentLastEventId: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_ID) + let currentLastEventId: Int64? = storage.read(key: StorageKey.LAST_EVENT_ID) if currentLastEventId == nil || currentLastEventId! <= 0 { - try? amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId) + try? storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId) } } } private func moveDeviceAndUserId() { - let currentDeviceId: String? = amplitude.storage.read(key: StorageKey.DEVICE_ID) + let currentDeviceId: String? = storage.read(key: StorageKey.DEVICE_ID) if currentDeviceId == nil || currentDeviceId! == "" { - if let deviceId = storage.getValue(RemnantDataMigration.DEVICE_ID_KEY) { - try? amplitude.storage.write(key: StorageKey.DEVICE_ID, value: deviceId) + if let deviceId = legacyStorage.getValue(RemnantDataMigration.DEVICE_ID_KEY) { + try? storage.write(key: StorageKey.DEVICE_ID, value: deviceId) } } - let currentUserId: String? = amplitude.storage.read(key: StorageKey.USER_ID) + let currentUserId: String? = storage.read(key: StorageKey.USER_ID) if currentUserId == nil || currentUserId == "" { - if let userId = storage.getValue(RemnantDataMigration.USER_ID_KEY) { - try? amplitude.storage.write(key: StorageKey.USER_ID, value: userId) + if let userId = legacyStorage.getValue(RemnantDataMigration.USER_ID_KEY) { + try? storage.write(key: StorageKey.USER_ID, value: userId) } } } private func moveSessionData() { - let currentSessionId: Int64? = amplitude.storage.read(key: StorageKey.PREVIOUS_SESSION_ID) - let currentLastEventTime: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME) + let currentSessionId: Int64? = storage.read(key: StorageKey.PREVIOUS_SESSION_ID) + let currentLastEventTime: Int64? = storage.read(key: StorageKey.LAST_EVENT_TIME) - let previousSessionId = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY) - let lastEventTime = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY) + let previousSessionId = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY) + let lastEventTime = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY) if (currentSessionId == nil || currentSessionId! < 0) && previousSessionId != nil && previousSessionId! >= 0 { - try? amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId) - storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY) + try? storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId) + legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY) } if (currentLastEventTime == nil || currentLastEventTime! < 0) && lastEventTime != nil && lastEventTime! >= 0 { - try? amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime) - storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY) + try? storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime) + legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY) } } private func moveEvents() -> Int64 { var maxEventId: Int64 = -1 - let remnantEvents = storage.readEvents() + let remnantEvents = legacyStorage.readEvents() remnantEvents.forEach { event in - let eventId = moveEvent(event, amplitude.storage, storage.removeEvent) + let eventId = moveEvent(event, storage, legacyStorage.removeEvent) if maxEventId < eventId { maxEventId = eventId } @@ -86,9 +90,9 @@ class RemnantDataMigration { private func moveIdentifies() -> Int64 { var maxEventId: Int64 = -1 - let remnantEvents = storage.readIdentifies() + let remnantEvents = legacyStorage.readIdentifies() remnantEvents.forEach { event in - let eventId = moveEvent(event, amplitude.storage, storage.removeIdentify) + let eventId = moveEvent(event, storage, legacyStorage.removeIdentify) if maxEventId < eventId { maxEventId = eventId } @@ -97,9 +101,9 @@ class RemnantDataMigration { } private func moveInterceptedIdentifies() { - let remnantEvents = storage.readInterceptedIdentifies() + let remnantEvents = legacyStorage.readInterceptedIdentifies() remnantEvents.forEach { event in - _ = moveEvent(event, amplitude.identifyStorage, storage.removeInterceptedIdentify) + _ = moveEvent(event, identifyStorage, legacyStorage.removeInterceptedIdentify) } } @@ -113,7 +117,7 @@ class RemnantDataMigration { removeFromSource(rowId!) return rowId! } catch { - amplitude.logger?.error(message: "event migration failed: \(error)") + logger?.error(message: "event migration failed: \(error)") return -1 } } diff --git a/Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift b/Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift index 6ef97475..bf4ca5dc 100644 --- a/Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift +++ b/Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift @@ -8,12 +8,17 @@ class AnalyticsConnectorPlugin: BeforePlugin { override func setup(amplitude: Amplitude) { super.setup(amplitude: amplitude) connector = AnalyticsConnector.getInstance(amplitude.configuration.instanceName) - connector!.eventBridge.setEventReceiver { event in - amplitude.track(event: BaseEvent( - eventType: event.eventType, - eventProperties: event.eventProperties as? [String: Any], - userProperties: event.userProperties as? [String: Any] - )) + let logger = amplitude.logger + connector!.eventBridge.setEventReceiver { [weak amplitude, logger] event in + if let amplitude = amplitude { + amplitude.track(event: BaseEvent( + eventType: event.eventType, + eventProperties: event.eventProperties as? [String: Any], + userProperties: event.userProperties as? [String: Any] + )) + } else { + logger?.error(message: "Amplitude instance has been deallocated, please maintain a strong reference to track events from Experiment") + } } } diff --git a/Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift b/Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift index 23dc7830..4e6cd3ea 100644 --- a/Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift +++ b/Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift @@ -54,14 +54,19 @@ open class NetworkConnectivityCheckerPlugin: BeforePlugin { amplitude.logger?.debug(message: "Installing NetworkConnectivityCheckerPlugin, offline feature should be supported.") pathCreation.start(queue: amplitude.trackingQueue) + let logger = amplitude.logger pathUpdateCancellable = pathCreation.networkPathPublisher? - .sink(receiveValue: { [weak self] networkPath in + .sink(receiveValue: { [weak amplitude, logger] networkPath in + guard let amplitude = amplitude else { + logger?.debug(message: "Received network connectivity updated when amplitude instance has been deallocated") + return + } let isOffline = !(networkPath.status == .satisfied) - if self?.amplitude?.configuration.offline == isOffline { + if amplitude.configuration.offline == isOffline { return } - self?.amplitude?.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").") - self?.amplitude?.configuration.offline = isOffline + amplitude.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").") + amplitude.configuration.offline = isOffline if !isOffline { amplitude.flush() } diff --git a/Sources/Amplitude/Sessions.swift b/Sources/Amplitude/Sessions.swift index 77fe9c84..18875bd2 100644 --- a/Sources/Amplitude/Sessions.swift +++ b/Sources/Amplitude/Sessions.swift @@ -1,17 +1,18 @@ import Foundation public class Sessions { - private let amplitude: Amplitude - + private let configuration: Configuration + private let storage: Storage + private let logger: (any Logger)? private var _sessionId: Int64 = -1 private(set) var sessionId: Int64 { get { _sessionId } set { _sessionId = newValue do { - try amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId) + try storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId) } catch { - amplitude.logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)") + logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)") } } } @@ -22,9 +23,9 @@ public class Sessions { set { _lastEventId = newValue do { - try amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId) + try storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId) } catch { - amplitude.logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)") + logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)") } } } @@ -35,15 +36,17 @@ public class Sessions { set { _lastEventTime = newValue do { - try amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime) + try storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime) } catch { - amplitude.logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)") + logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)") } } } init(amplitude: Amplitude) { - self.amplitude = amplitude + configuration = amplitude.configuration + storage = amplitude.storage + logger = amplitude.logger self._sessionId = amplitude.storage.read(key: .PREVIOUS_SESSION_ID) ?? -1 self._lastEventId = amplitude.storage.read(key: .LAST_EVENT_ID) ?? 0 self._lastEventTime = amplitude.storage.read(key: .LAST_EVENT_TIME) ?? -1 @@ -101,7 +104,7 @@ public class Sessions { private func isWithinMinTimeBetweenSessions(timestamp: Int64) -> Bool { let timeDelta = timestamp - self.lastEventTime - return timeDelta < amplitude.configuration.minTimeBetweenSessionsMillis + return timeDelta < configuration.minTimeBetweenSessionsMillis } public func startNewSessionIfNeeded(timestamp: Int64, inForeground: Bool) -> [BaseEvent]? { @@ -116,7 +119,7 @@ public class Sessions { public func startNewSession(timestamp: Int64) -> [BaseEvent] { var sessionEvents: [BaseEvent] = Array() - let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions + let trackingSessionEvents = configuration.defaultTracking.sessions // end previous session if trackingSessionEvents && self.sessionId >= 0 { @@ -145,7 +148,7 @@ public class Sessions { public func endCurrentSession() -> [BaseEvent] { var sessionEvents: [BaseEvent] = Array() - let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions + let trackingSessionEvents = configuration.defaultTracking.sessions if trackingSessionEvents && self.sessionId >= 0 { let sessionEndEvent = BaseEvent( diff --git a/Sources/Amplitude/Utilities/DefaultEventUtils.swift b/Sources/Amplitude/Utilities/DefaultEventUtils.swift index 4491e95f..807dd96e 100644 --- a/Sources/Amplitude/Utilities/DefaultEventUtils.swift +++ b/Sources/Amplitude/Utilities/DefaultEventUtils.swift @@ -1,7 +1,7 @@ import Foundation public class DefaultEventUtils { - private var amplitude: Amplitude? + private weak var amplitude: Amplitude? public init(amplitude: Amplitude) { self.amplitude = amplitude diff --git a/Sources/Amplitude/Utilities/EventPipeline.swift b/Sources/Amplitude/Utilities/EventPipeline.swift index 092bd76b..0639bf1c 100644 --- a/Sources/Amplitude/Utilities/EventPipeline.swift +++ b/Sources/Amplitude/Utilities/EventPipeline.swift @@ -8,9 +8,11 @@ import Foundation public class EventPipeline { - let amplitude: Amplitude var httpClient: HttpClient - var storage: Storage? { amplitude.storage } + let storage: Storage? + let logger: (any Logger)? + let configuration: Configuration + @Atomic internal var eventCount: Int = 0 internal var flushTimer: QueueTimer? private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com") @@ -22,11 +24,13 @@ public class EventPipeline { private var uploads = [URL: UploadTaskInfo]() init(amplitude: Amplitude) { - self.amplitude = amplitude - self.httpClient = HttpClient(configuration: amplitude.configuration, - diagnostics: amplitude.configuration.diagonostics, - callbackQueue: amplitude.trackingQueue) - self.flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in + storage = amplitude.storage + logger = amplitude.logger + configuration = amplitude.configuration + httpClient = HttpClient(configuration: amplitude.configuration, + diagnostics: amplitude.configuration.diagonostics, + callbackQueue: amplitude.trackingQueue) + flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in self?.flush() } } @@ -42,17 +46,17 @@ public class EventPipeline { } completion?() } catch { - amplitude.logger?.error(message: "Error when storing event: \(error.localizedDescription)") + logger?.error(message: "Error when storing event: \(error.localizedDescription)") } } func flush(completion: (() -> Void)? = nil) { - if self.amplitude.configuration.offline == true { - self.amplitude.logger?.debug(message: "Skipping flush while offline.") + if configuration.offline == true { + logger?.debug(message: "Skipping flush while offline.") return } - amplitude.logger?.log(message: "Start flushing \(eventCount) events") + logger?.log(message: "Start flushing \(eventCount) events") eventCount = 0 guard let storage = self.storage else { return } storage.rollover() @@ -60,19 +64,16 @@ public class EventPipeline { for eventFile in eventFiles { uploadsQueue.sync { guard uploads[eventFile] == nil else { - amplitude.logger?.log(message: "Existing upload in progress, skipping...") + logger?.log(message: "Existing upload in progress, skipping...") return } guard let eventsString = storage.getEventsString(eventBlock: eventFile), !eventsString.isEmpty else { return } - let uploadTask = httpClient.upload(events: eventsString) { [weak self] result in - guard let self else { - return - } + let uploadTask = httpClient.upload(events: eventsString) { [self] result in let responseHandler = storage.getResponseHandler( - configuration: self.amplitude.configuration, + configuration: self.configuration, eventPipeline: self, eventBlock: eventFile, eventsString: eventsString @@ -97,11 +98,11 @@ public class EventPipeline { } private func getFlushInterval() -> TimeInterval { - return TimeInterval.milliseconds(amplitude.configuration.flushIntervalMillis) + return TimeInterval.milliseconds(configuration.flushIntervalMillis) } private func getFlushCount() -> Int { - let count = amplitude.configuration.flushQueueSize + let count = configuration.flushQueueSize return count != 0 ? count : 1 } } diff --git a/Tests/AmplitudeTests/AmplitudeTests.swift b/Tests/AmplitudeTests/AmplitudeTests.swift index 0c776ae7..1038891c 100644 --- a/Tests/AmplitudeTests/AmplitudeTests.swift +++ b/Tests/AmplitudeTests/AmplitudeTests.swift @@ -726,6 +726,51 @@ final class AmplitudeTests: XCTestCase { XCTAssertEqual(allEventIds, Set(eventCollector.events.map(\.eventType))) } + func testDealloc() { + class TestAmplitude: Amplitude { + + static let expectedEventType = "Test" + + class TestStorage: InMemoryStorage { + + private let expectation: XCTestExpectation + + init(expectation: XCTestExpectation) { + self.expectation = expectation + } + + override func write(key: StorageKey, value: Any?) { + if key == .EVENTS, let event = value as? BaseEvent, event.eventType == TestAmplitude.expectedEventType { + expectation.fulfill() + } + } + } + + private let deallocExpectation: XCTestExpectation + + init(deallocExpectation: XCTestExpectation, trackExpectation: XCTestExpectation) { + self.deallocExpectation = deallocExpectation + super.init(configuration: Configuration(apiKey: "test-api-key", + storageProvider: TestStorage(expectation: trackExpectation))) + } + + deinit { + deallocExpectation.fulfill() + } + } + + let deallocExpectation = XCTestExpectation(description: "Amplitude object deallocates") + let trackExpectation = XCTestExpectation(description: "Event persisted to storage") + + autoreleasepool { + let amplitude = TestAmplitude(deallocExpectation: deallocExpectation, + trackExpectation: trackExpectation) + amplitude.track(eventType: TestAmplitude.expectedEventType) + wait(for: [trackExpectation], timeout: 10.0) + } + wait(for: [deallocExpectation], timeout: 10.0) + } + func getDictionary(_ props: [String: Any?]) -> NSDictionary { return NSDictionary(dictionary: props as [AnyHashable: Any]) } diff --git a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift index 06fa82f5..7fa6b81d 100644 --- a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift +++ b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift @@ -43,7 +43,7 @@ final class EventPipelineTests: XCTestCase { } func testInit() { - XCTAssertEqual(pipeline.amplitude.configuration.apiKey, configuration.apiKey) + XCTAssertEqual(pipeline.configuration.apiKey, configuration.apiKey) } func testPutEvent() { @@ -77,18 +77,18 @@ final class EventPipelineTests: XCTestCase { } func testFlushWhenOffline() { - pipeline.amplitude.configuration.offline = false + pipeline.configuration.offline = false let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent) XCTAssertEqual(httpClient.uploadCount, 0) - XCTAssertEqual(pipeline.amplitude.configuration.offline, false) + XCTAssertEqual(pipeline.configuration.offline, false) - pipeline.amplitude.configuration.offline = true + pipeline.configuration.offline = true pipeline.flush() - XCTAssertEqual(pipeline.amplitude.configuration.offline, true) + XCTAssertEqual(pipeline.configuration.offline, true) XCTAssertEqual(httpClient.uploadCount, 0, "There should be no uploads when offline") }