Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fix retain cycles in Amplitude so instance will not leak memory #161

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions Sources/Amplitude/Migration/RemnantDataMigration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ class RemnantDataMigration {
private static let PREVIOUS_SESSION_TIME_KEY = "previous_session_time"
private static let PREVIOUS_SESSION_ID_KEY = "previous_session_id"

private let amplitude: Amplitude
private let storage: LegacyDatabaseStorage
private let logger: (any Logger)?
private let storage: Storage
private let identifyStorage: Storage
private let legacyStorage: LegacyDatabaseStorage

init(_ amplitude: Amplitude) {
self.amplitude = amplitude
self.storage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger)
logger = amplitude.logger
storage = amplitude.storage
identifyStorage = amplitude.identifyStorage
legacyStorage = LegacyDatabaseStorage.getStorage(amplitude.configuration.instanceName, amplitude.logger)
}

func execute() {
let firstRunSinceUpgrade = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME) == nil
let firstRunSinceUpgrade = storage.read(key: StorageKey.LAST_EVENT_TIME) == nil

moveDeviceAndUserId()
moveSessionData()
Expand All @@ -31,52 +35,52 @@ class RemnantDataMigration {
}

if maxEventId > 0 {
let currentLastEventId: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_ID)
let currentLastEventId: Int64? = storage.read(key: StorageKey.LAST_EVENT_ID)
if currentLastEventId == nil || currentLastEventId! <= 0 {
try? amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId)
try? storage.write(key: StorageKey.LAST_EVENT_ID, value: maxEventId)
}
}
}

private func moveDeviceAndUserId() {
let currentDeviceId: String? = amplitude.storage.read(key: StorageKey.DEVICE_ID)
let currentDeviceId: String? = storage.read(key: StorageKey.DEVICE_ID)
if currentDeviceId == nil || currentDeviceId! == "" {
if let deviceId = storage.getValue(RemnantDataMigration.DEVICE_ID_KEY) {
try? amplitude.storage.write(key: StorageKey.DEVICE_ID, value: deviceId)
if let deviceId = legacyStorage.getValue(RemnantDataMigration.DEVICE_ID_KEY) {
try? storage.write(key: StorageKey.DEVICE_ID, value: deviceId)
}
}

let currentUserId: String? = amplitude.storage.read(key: StorageKey.USER_ID)
let currentUserId: String? = storage.read(key: StorageKey.USER_ID)
if currentUserId == nil || currentUserId == "" {
if let userId = storage.getValue(RemnantDataMigration.USER_ID_KEY) {
try? amplitude.storage.write(key: StorageKey.USER_ID, value: userId)
if let userId = legacyStorage.getValue(RemnantDataMigration.USER_ID_KEY) {
try? storage.write(key: StorageKey.USER_ID, value: userId)
}
}
}

private func moveSessionData() {
let currentSessionId: Int64? = amplitude.storage.read(key: StorageKey.PREVIOUS_SESSION_ID)
let currentLastEventTime: Int64? = amplitude.storage.read(key: StorageKey.LAST_EVENT_TIME)
let currentSessionId: Int64? = storage.read(key: StorageKey.PREVIOUS_SESSION_ID)
let currentLastEventTime: Int64? = storage.read(key: StorageKey.LAST_EVENT_TIME)

let previousSessionId = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
let lastEventTime = storage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
let previousSessionId = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
let lastEventTime = legacyStorage.getLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)

if (currentSessionId == nil || currentSessionId! < 0) && previousSessionId != nil && previousSessionId! >= 0 {
try? amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId)
storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
try? storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: previousSessionId)
legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_ID_KEY)
}

if (currentLastEventTime == nil || currentLastEventTime! < 0) && lastEventTime != nil && lastEventTime! >= 0 {
try? amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime)
storage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
try? storage.write(key: StorageKey.LAST_EVENT_TIME, value: lastEventTime)
legacyStorage.removeLongValue(RemnantDataMigration.PREVIOUS_SESSION_TIME_KEY)
}
}

private func moveEvents() -> Int64 {
var maxEventId: Int64 = -1
let remnantEvents = storage.readEvents()
let remnantEvents = legacyStorage.readEvents()
remnantEvents.forEach { event in
let eventId = moveEvent(event, amplitude.storage, storage.removeEvent)
let eventId = moveEvent(event, storage, legacyStorage.removeEvent)
if maxEventId < eventId {
maxEventId = eventId
}
Expand All @@ -86,9 +90,9 @@ class RemnantDataMigration {

private func moveIdentifies() -> Int64 {
var maxEventId: Int64 = -1
let remnantEvents = storage.readIdentifies()
let remnantEvents = legacyStorage.readIdentifies()
remnantEvents.forEach { event in
let eventId = moveEvent(event, amplitude.storage, storage.removeIdentify)
let eventId = moveEvent(event, storage, legacyStorage.removeIdentify)
if maxEventId < eventId {
maxEventId = eventId
}
Expand All @@ -97,9 +101,9 @@ class RemnantDataMigration {
}

private func moveInterceptedIdentifies() {
let remnantEvents = storage.readInterceptedIdentifies()
let remnantEvents = legacyStorage.readInterceptedIdentifies()
remnantEvents.forEach { event in
_ = moveEvent(event, amplitude.identifyStorage, storage.removeInterceptedIdentify)
_ = moveEvent(event, identifyStorage, legacyStorage.removeInterceptedIdentify)
}
}

Expand All @@ -113,7 +117,7 @@ class RemnantDataMigration {
removeFromSource(rowId!)
return rowId!
} catch {
amplitude.logger?.error(message: "event migration failed: \(error)")
logger?.error(message: "event migration failed: \(error)")
return -1
}
}
Expand Down
17 changes: 11 additions & 6 deletions Sources/Amplitude/Plugins/AnalyticsConnectorPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ class AnalyticsConnectorPlugin: BeforePlugin {
override func setup(amplitude: Amplitude) {
super.setup(amplitude: amplitude)
connector = AnalyticsConnector.getInstance(amplitude.configuration.instanceName)
connector!.eventBridge.setEventReceiver { event in
amplitude.track(event: BaseEvent(
eventType: event.eventType,
eventProperties: event.eventProperties as? [String: Any],
userProperties: event.userProperties as? [String: Any]
))
let logger = amplitude.logger
connector!.eventBridge.setEventReceiver { [weak amplitude, logger] event in
if let amplitude = amplitude {
amplitude.track(event: BaseEvent(
eventType: event.eventType,
eventProperties: event.eventProperties as? [String: Any],
userProperties: event.userProperties as? [String: Any]
))
} else {
logger?.error(message: "Amplitude instance has been deallocated, please maintain a strong reference to track events from Experiment")
}
}
}

Expand Down
13 changes: 9 additions & 4 deletions Sources/Amplitude/Plugins/NetworkConnectivityCheckerPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ open class NetworkConnectivityCheckerPlugin: BeforePlugin {
amplitude.logger?.debug(message: "Installing NetworkConnectivityCheckerPlugin, offline feature should be supported.")

pathCreation.start(queue: amplitude.trackingQueue)
let logger = amplitude.logger
pathUpdateCancellable = pathCreation.networkPathPublisher?
.sink(receiveValue: { [weak self] networkPath in
.sink(receiveValue: { [weak amplitude, logger] networkPath in
guard let amplitude = amplitude else {
logger?.debug(message: "Received network connectivity updated when amplitude instance has been deallocated")
return
crleona marked this conversation as resolved.
Show resolved Hide resolved
}
let isOffline = !(networkPath.status == .satisfied)
if self?.amplitude?.configuration.offline == isOffline {
if amplitude.configuration.offline == isOffline {
return
}
self?.amplitude?.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").")
self?.amplitude?.configuration.offline = isOffline
amplitude.logger?.debug(message: "Network connectivity changed to \(isOffline ? "offline" : "online").")
amplitude.configuration.offline = isOffline
if !isOffline {
amplitude.flush()
}
Expand Down
27 changes: 15 additions & 12 deletions Sources/Amplitude/Sessions.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import Foundation

public class Sessions {
private let amplitude: Amplitude

private let configuration: Configuration
private let storage: Storage
private let logger: (any Logger)?
private var _sessionId: Int64 = -1
private(set) var sessionId: Int64 {
get { _sessionId }
set {
_sessionId = newValue
do {
try amplitude.storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId)
try storage.write(key: StorageKey.PREVIOUS_SESSION_ID, value: _sessionId)
} catch {
amplitude.logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)")
logger?.warn(message: "Can't write PREVIOUS_SESSION_ID to storage: \(error)")
}
}
}
Expand All @@ -22,9 +23,9 @@ public class Sessions {
set {
_lastEventId = newValue
do {
try amplitude.storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId)
try storage.write(key: StorageKey.LAST_EVENT_ID, value: _lastEventId)
} catch {
amplitude.logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)")
logger?.warn(message: "Can't write LAST_EVENT_ID to storage: \(error)")
}
}
}
Expand All @@ -35,15 +36,17 @@ public class Sessions {
set {
_lastEventTime = newValue
do {
try amplitude.storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime)
try storage.write(key: StorageKey.LAST_EVENT_TIME, value: _lastEventTime)
} catch {
amplitude.logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)")
logger?.warn(message: "Can't write LAST_EVENT_TIME to storage: \(error)")
}
}
}

init(amplitude: Amplitude) {
self.amplitude = amplitude
configuration = amplitude.configuration
storage = amplitude.storage
logger = amplitude.logger
self._sessionId = amplitude.storage.read(key: .PREVIOUS_SESSION_ID) ?? -1
self._lastEventId = amplitude.storage.read(key: .LAST_EVENT_ID) ?? 0
self._lastEventTime = amplitude.storage.read(key: .LAST_EVENT_TIME) ?? -1
Expand Down Expand Up @@ -101,7 +104,7 @@ public class Sessions {

private func isWithinMinTimeBetweenSessions(timestamp: Int64) -> Bool {
let timeDelta = timestamp - self.lastEventTime
return timeDelta < amplitude.configuration.minTimeBetweenSessionsMillis
return timeDelta < configuration.minTimeBetweenSessionsMillis
}

public func startNewSessionIfNeeded(timestamp: Int64, inForeground: Bool) -> [BaseEvent]? {
Expand All @@ -116,7 +119,7 @@ public class Sessions {

public func startNewSession(timestamp: Int64) -> [BaseEvent] {
var sessionEvents: [BaseEvent] = Array()
let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions
let trackingSessionEvents = configuration.defaultTracking.sessions

// end previous session
if trackingSessionEvents && self.sessionId >= 0 {
Expand Down Expand Up @@ -145,7 +148,7 @@ public class Sessions {

public func endCurrentSession() -> [BaseEvent] {
var sessionEvents: [BaseEvent] = Array()
let trackingSessionEvents = amplitude.configuration.defaultTracking.sessions
let trackingSessionEvents = configuration.defaultTracking.sessions

if trackingSessionEvents && self.sessionId >= 0 {
let sessionEndEvent = BaseEvent(
Expand Down
2 changes: 1 addition & 1 deletion Sources/Amplitude/Utilities/DefaultEventUtils.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Foundation

public class DefaultEventUtils {
private var amplitude: Amplitude?
private weak var amplitude: Amplitude?

public init(amplitude: Amplitude) {
self.amplitude = amplitude
Expand Down
39 changes: 20 additions & 19 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import Foundation

public class EventPipeline {
let amplitude: Amplitude
var httpClient: HttpClient
var storage: Storage? { amplitude.storage }
let storage: Storage?
let logger: (any Logger)?
let configuration: Configuration

@Atomic internal var eventCount: Int = 0
internal var flushTimer: QueueTimer?
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com")
Expand All @@ -22,11 +24,13 @@ public class EventPipeline {
private var uploads = [URL: UploadTaskInfo]()

init(amplitude: Amplitude) {
self.amplitude = amplitude
self.httpClient = HttpClient(configuration: amplitude.configuration,
diagnostics: amplitude.configuration.diagonostics,
callbackQueue: amplitude.trackingQueue)
self.flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in
storage = amplitude.storage
logger = amplitude.logger
configuration = amplitude.configuration
httpClient = HttpClient(configuration: amplitude.configuration,
diagnostics: amplitude.configuration.diagonostics,
callbackQueue: amplitude.trackingQueue)
flushTimer = QueueTimer(interval: getFlushInterval(), queue: amplitude.trackingQueue) { [weak self] in
self?.flush()
}
}
Expand All @@ -42,37 +46,34 @@ public class EventPipeline {
}
completion?()
} catch {
amplitude.logger?.error(message: "Error when storing event: \(error.localizedDescription)")
logger?.error(message: "Error when storing event: \(error.localizedDescription)")
}
}

func flush(completion: (() -> Void)? = nil) {
if self.amplitude.configuration.offline == true {
self.amplitude.logger?.debug(message: "Skipping flush while offline.")
if configuration.offline == true {
logger?.debug(message: "Skipping flush while offline.")
return
}

amplitude.logger?.log(message: "Start flushing \(eventCount) events")
logger?.log(message: "Start flushing \(eventCount) events")
eventCount = 0
guard let storage = self.storage else { return }
storage.rollover()
guard let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS) else { return }
for eventFile in eventFiles {
uploadsQueue.sync {
guard uploads[eventFile] == nil else {
amplitude.logger?.log(message: "Existing upload in progress, skipping...")
logger?.log(message: "Existing upload in progress, skipping...")
return
}
guard let eventsString = storage.getEventsString(eventBlock: eventFile),
!eventsString.isEmpty else {
return
}
let uploadTask = httpClient.upload(events: eventsString) { [weak self] result in
guard let self else {
return
}
let uploadTask = httpClient.upload(events: eventsString) { [self] result in
let responseHandler = storage.getResponseHandler(
configuration: self.amplitude.configuration,
configuration: self.configuration,
eventPipeline: self,
eventBlock: eventFile,
eventsString: eventsString
Expand All @@ -97,11 +98,11 @@ public class EventPipeline {
}

private func getFlushInterval() -> TimeInterval {
return TimeInterval.milliseconds(amplitude.configuration.flushIntervalMillis)
return TimeInterval.milliseconds(configuration.flushIntervalMillis)
}

private func getFlushCount() -> Int {
let count = amplitude.configuration.flushQueueSize
let count = configuration.flushQueueSize
return count != 0 ? count : 1
}
}
Expand Down
Loading
Loading