diff --git a/Sources/Amplitude/Utilities/EventPipeline.swift b/Sources/Amplitude/Utilities/EventPipeline.swift index 0639bf1..becc731 100644 --- a/Sources/Amplitude/Utilities/EventPipeline.swift +++ b/Sources/Amplitude/Utilities/EventPipeline.swift @@ -17,11 +17,8 @@ public class EventPipeline { internal var flushTimer: QueueTimer? private let uploadsQueue = DispatchQueue(label: "uploadsQueue.amplitude.com") - internal struct UploadTaskInfo { - let events: String - let task: URLSessionDataTask - } - private var uploads = [URL: UploadTaskInfo]() + private var flushCompletions: [() -> Void] = [] + private var currentUpload: URLSessionTask? init(amplitude: Amplitude) { storage = amplitude.storage @@ -60,33 +57,58 @@ public class EventPipeline { eventCount = 0 guard let storage = self.storage else { return } storage.rollover() - guard let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS) else { return } - for eventFile in eventFiles { - uploadsQueue.sync { - guard uploads[eventFile] == nil else { - logger?.log(message: "Existing upload in progress, skipping...") - return - } - guard let eventsString = storage.getEventsString(eventBlock: eventFile), - !eventsString.isEmpty else { + + uploadsQueue.async { [self] in + if let completion { + flushCompletions.append(completion) + } + self.sendNextEventFile() + } + } + + private func sendNextEventFile() { + guard currentUpload == nil else { + logger?.log(message: "Existing upload in progress, skipping...") + return + } + + guard let storage = storage, + let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS), + let nextEventFile = eventFiles.first else { + flushCompletions.forEach { $0() } + flushCompletions.removeAll() + logger?.debug(message: "No event files to upload") + return + } + + guard configuration.offline != true else { + logger?.debug(message: "Skipping flush while offline.") + return + } + + guard let eventsString = storage.getEventsString(eventBlock: nextEventFile), + !eventsString.isEmpty else { + logger?.log(message: "Could not read events file: \(nextEventFile)") + return + } + + currentUpload = httpClient.upload(events: eventsString) { [self] result in + let responseHandler = storage.getResponseHandler( + configuration: self.configuration, + eventPipeline: self, + eventBlock: nextEventFile, + eventsString: eventsString + ) + responseHandler.handle(result: result) + // Don't send the next event file if we're being deallocated + self.uploadsQueue.async { [weak self] in + guard let self = self else { return } - let uploadTask = httpClient.upload(events: eventsString) { [self] result in - let responseHandler = storage.getResponseHandler( - configuration: self.configuration, - eventPipeline: self, - eventBlock: eventFile, - eventsString: eventsString - ) - responseHandler.handle(result: result) - self.completeUpload(for: eventFile) - } - if let uploadTask { - uploads[eventFile] = UploadTaskInfo(events: eventsString, task: uploadTask) - } + self.currentUpload = nil + self.sendNextEventFile() } } - completion?() } func start() { @@ -106,12 +128,3 @@ public class EventPipeline { return count != 0 ? count : 1 } } - -extension EventPipeline { - - func completeUpload(for eventBlock: URL) { - uploadsQueue.sync { - uploads[eventBlock] = nil - } - } -} diff --git a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift index 7fa6b81..ca2a184 100644 --- a/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift +++ b/Tests/AmplitudeTests/Utilities/EventPipelineTests.swift @@ -116,6 +116,35 @@ final class EventPipelineTests: XCTestCase { XCTAssertEqual(uploadedEvents![0].eventType, "testEvent") } + func testOneUploadAtATime() { + let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent) + pipeline?.storage?.rollover() + + let testEvent2 = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent2") + try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent) + pipeline.storage?.rollover() + + let httpResponseExpectation1 = expectation(description: "httpresponse1") + let httpResponseExpectation2 = expectation(description: "httpresponse2") + httpClient.uploadExpectations = [httpResponseExpectation1, httpResponseExpectation2] + + httpResponseExpectation2.isInverted = true + + let flushExpectation = expectation(description: "flush") + pipeline.flush { + flushExpectation.fulfill() + } + + wait(for: [httpResponseExpectation1], timeout: 1) + + httpResponseExpectation2.isInverted = false + + wait(for: [httpResponseExpectation2, flushExpectation], timeout: 1) + + XCTAssertEqual(httpClient.uploadCount, 2) + } + func testInvalidEventUpload() { let invalidResponseData = "{\"events_with_invalid_fields\": {\"user_id\": [0]}}".data(using: .utf8)!