Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change storage to use sync queue, fix retry issues #14

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/Amplitude/EventBridge.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// File.swift
// EventBridge.swift
//
//
// Created by Marvin Liu on 10/27/22.
Expand Down
16 changes: 8 additions & 8 deletions Sources/Amplitude/Storages/InMemoryStorage.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// File.swift
// InMemoryStorage.swift
//
//
// Created by Marvin Liu on 10/28/22.
Expand All @@ -10,31 +10,31 @@ import Foundation
class InMemoryStorage: Storage {
typealias EventBlock = URL

func write(key: StorageKey, value: Any?) async {
func write(key: StorageKey, value: Any?) {

}

func read<T>(key: StorageKey) async -> T? {
func read<T>(key: StorageKey) -> T? {
return nil
}

func reset() async {
func reset() {

}

func rollover() async {
func rollover() {

}

func getEventsString(eventBlock: EventBlock) async -> String? {
func getEventsString(eventBlock: EventBlock) -> String? {
return nil
}

func remove(eventBlock: EventBlock) async {
func remove(eventBlock: EventBlock) {

}

func splitBlock(eventBlock: EventBlock, events: [BaseEvent]) async {
func splitBlock(eventBlock: EventBlock, events: [BaseEvent]) {

}

Expand Down
130 changes: 74 additions & 56 deletions Sources/Amplitude/Storages/PersistentStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation

actor PersistentStorage: Storage {
class PersistentStorage: Storage {
typealias EventBlock = URL

let storagePrefix: String
Expand All @@ -16,40 +16,46 @@ actor PersistentStorage: Storage {
private var outputStream: OutputFileStream?
internal weak var amplitude: Amplitude?

let syncQueue = DispatchQueue(label: "syncPersistentStorage.amplitude.com")

init(apiKey: String = "") {
self.storagePrefix = "\(PersistentStorage.DEFAULT_STORAGE_PREFIX)-\(apiKey)"
self.userDefaults = UserDefaults(suiteName: "\(PersistentStorage.AMP_STORAGE_PREFIX).\(storagePrefix)")
self.fileManager = FileManager.default
}

func write(key: StorageKey, value: Any?) async throws {
switch key {
case .EVENTS:
if let event = value as? BaseEvent {
let eventStoreFile = getCurrentFile()
self.storeEvent(toFile: eventStoreFile, event: event)
}
default:
if isBasicType(value: value) {
userDefaults?.set(value, forKey: key.rawValue)
} else {
throw Exception.unsupportedType
func write(key: StorageKey, value: Any?) throws {
try syncQueue.sync {
switch key {
case .EVENTS:
if let event = value as? BaseEvent {
let eventStoreFile = getCurrentFile()
self.storeEvent(toFile: eventStoreFile, event: event)
}
default:
if isBasicType(value: value) {
userDefaults?.set(value, forKey: key.rawValue)
} else {
throw Exception.unsupportedType
}
}
}
}

func read<T>(key: StorageKey) async -> T? {
var result: T?
switch key {
case .EVENTS:
result = getEventFiles() as? T
default:
result = userDefaults?.object(forKey: key.rawValue) as? T
func read<T>(key: StorageKey) -> T? {
syncQueue.sync {
var result: T?
switch key {
case .EVENTS:
result = getEventFiles() as? T
default:
result = userDefaults?.object(forKey: key.rawValue) as? T
}
return result
}
return result
}

func getEventsString(eventBlock: EventBlock) async -> String? {
func getEventsString(eventBlock: EventBlock) -> String? {
var content: String?
do {
content = try String(contentsOf: eventBlock, encoding: .utf8)
Expand All @@ -59,25 +65,33 @@ actor PersistentStorage: Storage {
return content
}

func remove(eventBlock: EventBlock) async {
do {
try fileManager!.removeItem(atPath: eventBlock.path)
} catch {
amplitude?.logger?.error(message: error.localizedDescription)
func remove(eventBlock: EventBlock) {
syncQueue.sync {
do {
try fileManager!.removeItem(atPath: eventBlock.path)
} catch {
amplitude?.logger?.error(message: error.localizedDescription)
}
}
}

func splitBlock(eventBlock: EventBlock, events: [BaseEvent]) async {
let total = events.count
let half = total / 2
let leftSplit = Array(events[0..<half])
let rightSplit = Array(events[half..<total])
storeEventsInNewFile(toFile: eventBlock.appendingPathComponent("-1"), events: leftSplit)
storeEventsInNewFile(toFile: eventBlock.appendingPathComponent("-2"), events: rightSplit)
await remove(eventBlock: eventBlock)
func splitBlock(eventBlock: EventBlock, events: [BaseEvent]) {
syncQueue.sync {
let total = events.count
let half = total / 2
let leftSplit = Array(events[0..<half])
let rightSplit = Array(events[half..<total])
storeEventsInNewFile(toFile: eventBlock.appendFileNameSuffix(suffix: "-1"), events: leftSplit)
storeEventsInNewFile(toFile: eventBlock.appendFileNameSuffix(suffix: "-2"), events: rightSplit)
do {
try fileManager!.removeItem(atPath: eventBlock.path)
} catch {
amplitude?.logger?.error(message: error.localizedDescription)
}
}
}

nonisolated func getResponseHandler(
func getResponseHandler(
configuration: Configuration,
eventPipeline: EventPipeline,
eventBlock: EventBlock,
Expand All @@ -92,27 +106,31 @@ actor PersistentStorage: Storage {
)
}

func reset() async {
let urls = getEventFiles(includeUnfinished: true)
let keys = userDefaults?.dictionaryRepresentation().keys
keys?.forEach { key in
userDefaults?.removeObject(forKey: key)
}
for url in urls {
try? fileManager!.removeItem(atPath: url.path)
func reset() {
syncQueue.sync {
let urls = getEventFiles(includeUnfinished: true)
let keys = userDefaults?.dictionaryRepresentation().keys
keys?.forEach { key in
userDefaults?.removeObject(forKey: key)
}
for url in urls {
try? fileManager!.removeItem(atPath: url.path)
}
}
}

func rollover() async {
let currentFile = getCurrentFile()
if fileManager?.fileExists(atPath: currentFile.path) == false {
return
}
if let attributes = try? fileManager?.attributesOfItem(atPath: currentFile.path),
let fileSize = attributes[FileAttributeKey.size] as? UInt64,
fileSize >= 0
{
finish(file: currentFile)
func rollover() {
syncQueue.sync {
let currentFile = getCurrentFile()
if fileManager?.fileExists(atPath: currentFile.path) == false {
return
}
if let attributes = try? fileManager?.attributesOfItem(atPath: currentFile.path),
let fileSize = attributes[FileAttributeKey.size] as? UInt64,
fileSize >= 0
{
finish(file: currentFile)
}
}
}

Expand Down Expand Up @@ -251,7 +269,7 @@ extension PersistentStorage {
private func storeEventsInNewFile(toFile file: URL, events: [BaseEvent]) {
let storeFile = file

guard fileManager?.fileExists(atPath: storeFile.path) == true else {
guard fileManager?.fileExists(atPath: storeFile.path) != true else {
yuhao900914 marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down Expand Up @@ -301,10 +319,10 @@ extension PersistentStorage {
let fileEnding = "]"
do {
try outputStream.write(fileEnding)
try outputStream.close()
} catch {
amplitude?.logger?.error(message: error.localizedDescription)
}
outputStream.close()
self.outputStream = nil

let fileWithoutTemp = file.deletingPathExtension()
Expand Down
20 changes: 10 additions & 10 deletions Sources/Amplitude/Types.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public typealias EventCallBack = (BaseEvent, Int, String) -> Void
// It cannot be dynamically associated with this protocol.
// https://github.com/apple/swift/issues/62219#issuecomment-1326531801
public protocol Storage {
func write(key: StorageKey, value: Any?) async throws
func read<T>(key: StorageKey) async -> T?
func getEventsString(eventBlock: URL) async -> String?
func remove(eventBlock: URL) async
func splitBlock(eventBlock: URL, events: [BaseEvent]) async
func rollover() async
func reset() async
func write(key: StorageKey, value: Any?) throws
func read<T>(key: StorageKey) -> T?
func getEventsString(eventBlock: URL) -> String?
func remove(eventBlock: URL)
func splitBlock(eventBlock: URL, events: [BaseEvent])
func rollover()
func reset()
func getResponseHandler(
configuration: Configuration,
eventPipeline: EventPipeline,
Expand Down Expand Up @@ -94,9 +94,9 @@ extension Plugin {

public protocol ResponseHandler {
func handle(result: Result<Int, Error>)
func handleSuccessResponse(code: Int) async
func handleBadRequestResponse(data: [String: Any]) async
func handlePayloadTooLargeResponse(data: [String: Any]) async
func handleSuccessResponse(code: Int)
func handleBadRequestResponse(data: [String: Any])
func handlePayloadTooLargeResponse(data: [String: Any])
func handleTooManyRequestsResponse(data: [String: Any])
func handleTimeoutResponse(data: [String: Any])
func handleFailedResponse(data: [String: Any])
Expand Down
36 changes: 18 additions & 18 deletions Sources/Amplitude/Utilities/EventPipeline.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// File.swift
// EventPipeline.swift
//
//
// Created by Marvin Liu on 10/28/22.
Expand Down Expand Up @@ -35,29 +35,28 @@ public class EventPipeline {
func put(event: BaseEvent, completion: (() -> Void)? = nil) {
guard let storage = self.storage else { return }
event.attempts += 1
Task {
do {
try await storage.write(key: StorageKey.EVENTS, value: event)
eventCount += 1
if eventCount >= getFlushCount() {
flush()
}
completion?()
} catch {
amplitude.logger?.error(message: "Error when storing event: \(error.localizedDescription)")
do {
try storage.write(key: StorageKey.EVENTS, value: event)
eventCount += 1
if eventCount >= getFlushCount() {
flush()
}
completion?()
} catch {
amplitude.logger?.error(message: "Error when storing event: \(error.localizedDescription)")
}
}

func flush(completion: (() -> Void)? = nil) {
Task {
guard let storage = self.storage else { return }
await storage.rollover()
guard let eventFiles: [URL]? = await storage.read(key: StorageKey.EVENTS) else { return }
amplitude.logger?.log(message: "Start flushing \(eventCount) events")
eventCount = 0
amplitude.logger?.log(message: "Start flushing \(eventCount) events")
eventCount = 0
guard let storage = self.storage else { return }
storage.rollover()
guard let eventFiles: [URL]? = storage.read(key: StorageKey.EVENTS) else { return }
cleanupUploads()
yuhao900914 marked this conversation as resolved.
Show resolved Hide resolved
if pendingUploads == 0 {
for eventFile in eventFiles! {
guard let eventsString = await storage.getEventsString(eventBlock: eventFile) else {
guard let eventsString = storage.getEventsString(eventBlock: eventFile) else {
continue
}
if eventsString.isEmpty {
Expand All @@ -71,6 +70,7 @@ public class EventPipeline {
eventsString: eventsString
)
responseHandler.handle(result: result)
self?.cleanupUploads()
}
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(events: eventsString, task: upload))
Expand Down
Loading