From 299cfd5306d70c80b1bcae19f10c8204d2c2865b Mon Sep 17 00:00:00 2001 From: hiddenviewer Date: Fri, 25 Oct 2024 17:32:42 +0900 Subject: [PATCH 1/4] Add subscribeStatus(callback) function to Document --- Sources/Document/Document.swift | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Sources/Document/Document.swift b/Sources/Document/Document.swift index 2f4db13f..8a5aa62b 100644 --- a/Sources/Document/Document.swift +++ b/Sources/Document/Document.swift @@ -88,6 +88,7 @@ public class Document { private var subscribeCallbacks = [String: SubscribeCallback]() private var presenceSubscribeCallback = [String: SubscribeCallback]() private var connectionSubscribeCallback: SubscribeCallback? + private var statusSubscribeCallback: SubscribeCallback? private var syncSubscribeCallback: SubscribeCallback? /** @@ -194,6 +195,14 @@ public class Document { self.connectionSubscribeCallback = callback } + /** + * `subscribeStatus` registers a callback to subscribe to events on the document. + * The callback will be called when the document status changes. + */ + public func subscribeStatus(_ callback: @escaping SubscribeCallback) { + self.statusSubscribeCallback = callback + } + /** * `subscribePresence` registers a callback to subscribe to events on the document. * The callback will be called when the targetPath or any of its nested values change. @@ -639,6 +648,8 @@ public class Document { } } else if event.type == .connectionChanged { self.connectionSubscribeCallback?(event, self) + } else if event.type == .statusChanged { + self.statusSubscribeCallback?(event, self) } else if event.type == .syncStatusChanged { self.syncSubscribeCallback?(event, self) } else if event.type == .snapshot { From 3d1286297185388d6220b151e66529b109fd6d99 Mon Sep 17 00:00:00 2001 From: hiddenviewer Date: Fri, 25 Oct 2024 17:36:18 +0900 Subject: [PATCH 2/4] Trigger 'detached' event on Client.deactivate() --- Sources/Core/Client.swift | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/Sources/Core/Client.swift b/Sources/Core/Client.swift index 03c38349..33a3bb75 100644 --- a/Sources/Core/Client.swift +++ b/Sources/Core/Client.swift @@ -206,10 +206,6 @@ public class Client { return } - for item in self.attachmentMap { - try self.stopWatchLoop(item.key) - } - let deactivateRequest = DeactivateClientRequest.with { $0.clientID = clientID } let deactivateResponse = await self.rpcClient.deactivateClient(request: deactivateRequest) @@ -219,6 +215,11 @@ public class Client { throw YorkieError.rpcError(message: deactivateResponse.error.debugDescription) } + for (key, attachment) in self.attachmentMap { + attachment.doc.applyStatus(.detached) + try self.detachInternal(key) + } + self.status = .deactivated Logger.info("Client(\(self.key) deactivated.") @@ -336,9 +337,7 @@ public class Client { doc.applyStatus(.detached) } - try self.stopWatchLoop(doc.getKey()) - - self.attachmentMap.removeValue(forKey: doc.getKey()) + try self.detachInternal(doc.getKey()) Logger.info("[DD] c:\"\(self.key)\" detaches d:\"\(doc.getKey())\"") @@ -683,6 +682,16 @@ public class Client { } } + private func detachInternal(_ docKey: DocumentKey) throws { + guard self.attachmentMap[docKey] != nil else { + return + } + + try self.stopWatchLoop(docKey) + + self.attachmentMap.removeValue(forKey: docKey) + } + @discardableResult private func syncInternal(_ attachment: Attachment, _ syncMode: SyncMode) async throws -> Document { guard let clientID = self.id else { From 49c71ce40c3017a6c6a265e0addf728c6f9df0de Mon Sep 17 00:00:00 2001 From: hiddenviewer Date: Fri, 25 Oct 2024 17:40:48 +0900 Subject: [PATCH 3/4] Add test code for Document.subscribeStatus(callback) --- .../DocumentIntegrationTests.swift | 104 ++++++++++++++++++ Tests/Integration/IntegrationHelper.swift | 51 +++++++++ 2 files changed, 155 insertions(+) diff --git a/Tests/Integration/DocumentIntegrationTests.swift b/Tests/Integration/DocumentIntegrationTests.swift index 27b660d1..9e850312 100644 --- a/Tests/Integration/DocumentIntegrationTests.swift +++ b/Tests/Integration/DocumentIntegrationTests.swift @@ -470,4 +470,108 @@ final class DocumentIntegrationTests: XCTestCase { try await self.c1.deactivate() try await self.c2.deactivate() } + + func test_subscribe_document_status_changed_event() async throws { + let c1 = Client(rpcAddress) + let c2 = Client(rpcAddress) + try await c1.activate() + try await c2.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + + let eventCollectorD1 = EventCollector(doc: d1) + let eventCollectorD2 = EventCollector(doc: d2) + await eventCollectorD1.subscribeDocumentStatus() + await eventCollectorD2.subscribeDocumentStatus() + + // 1. When the client attaches a document, it receives an attached event. + try await c1.attach(d1) + try await c2.attach(d2) + + // 2. When c1 detaches a document, it receives a detached event. + try await c1.detach(d1) + + // 3. When c2 deactivates, it should also receive a detached event. + try await c2.deactivate() + + await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .detached) + + await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached) + + // 4. When other document is attached, it receives an attached event. + let docKey2 = "\(self.description)-\(Date().description)".toDocKey + let d3 = Document(key: docKey2) + let d4 = Document(key: docKey2) + let eventCollectorD3 = EventCollector(doc: d3) + let eventCollectorD4 = EventCollector(doc: d4) + await eventCollectorD3.subscribeDocumentStatus() + await eventCollectorD4.subscribeDocumentStatus() + + try await c1.attach(d3, [:], .manual) + + try await c2.activate() + try await c2.attach(d4, [:], .manual) + + // 5. When c1 removes a document, it receives a removed event. + try await c1.remove(d3) + + // 6. When c2 syncs, it should also receive a removed event. + try await c2.sync() + + await eventCollectorD3.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD3.verifyNthValue(at: 2, isEqualTo: .removed) + + await eventCollectorD4.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD4.verifyNthValue(at: 2, isEqualTo: .removed) + + // 7. If the document is in the removed state, a detached event should not occur when deactivating. + let eventCount3 = eventCollectorD3.count + let eventCount4 = eventCollectorD4.count + try await c1.deactivate() + try await c2.deactivate() + + try await Task.sleep(nanoseconds: 500_000_000) + + XCTAssertEqual(eventCount3, eventCollectorD3.count) + XCTAssertEqual(eventCount4, eventCollectorD4.count) + } + + func test_document_status_changes_to_detached_when_deactivating() async throws { + let c1 = Client(rpcAddress) + let c2 = Client(rpcAddress) + try await c1.activate() + try await c2.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + + let eventCollectorD1 = EventCollector(doc: d1) + let eventCollectorD2 = EventCollector(doc: d2) + await eventCollectorD1.subscribeDocumentStatus() + await eventCollectorD2.subscribeDocumentStatus() + + // 1. When the client attaches a document, it receives an attached event. + try await c1.attach(d1, [:], .manual) + try await c2.attach(d2, [:], .manual) + + await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached) + await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached) + + // 2. When c1 removes a document, it receives a removed event. + try await c1.remove(d1) + await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .removed) + + // 3. When c2 deactivates, it should also receive a removed event. + try await c2.deactivate() + // NOTE: For now, document status changes to `Detached` when deactivating. + // This behavior may change in the future. + await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached) + + try await c1.deactivate() + } } diff --git a/Tests/Integration/IntegrationHelper.swift b/Tests/Integration/IntegrationHelper.swift index 6d74a069..e2636886 100644 --- a/Tests/Integration/IntegrationHelper.swift +++ b/Tests/Integration/IntegrationHelper.swift @@ -153,3 +153,54 @@ func subscribeDocs(_ d1: Document, _ d2: Document, _ d1Expected: [any OperationI } } } + +class EventCollector { + let doc: Document + var values: [T] = [] + var count: Int { + return self.values.count + } + + init(doc: Document) { + self.doc = doc + } + + func add(event: T) { + self.values.append(event) + } + + func asyncStream() -> AsyncStream { + return AsyncStream { continuation in + for value in self.values { + continuation.yield(value) + } + continuation.finish() + } + } + + func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async { + if nth > self.values.count { + XCTFail("Expected \(nth)th value: \(targetValue), but the stream ended before reaching the value") + } + + var counter = 0 + for await value in self.asyncStream() { + counter += 1 + + if counter == nth { + print("\(nth)th, value: \(value)") + XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)") + break + } + } + } + + func subscribeDocumentStatus() async where T == DocumentStatus { + await self.doc.subscribeStatus { [weak self] event, _ in + guard let status = (event as? StatusChangedEvent)?.value.status else { + return + } + self?.add(event: status) + } + } +} From 19a8ea5160b5e406d58ed33f526c7119e1543750 Mon Sep 17 00:00:00 2001 From: hiddenviewer Date: Mon, 28 Oct 2024 21:49:10 +0900 Subject: [PATCH 4/4] Update EventCollector based on coderabbitai review feedback --- Tests/Integration/IntegrationHelper.swift | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/Tests/Integration/IntegrationHelper.swift b/Tests/Integration/IntegrationHelper.swift index e2636886..2498df80 100644 --- a/Tests/Integration/IntegrationHelper.swift +++ b/Tests/Integration/IntegrationHelper.swift @@ -155,8 +155,14 @@ func subscribeDocs(_ d1: Document, _ d2: Document, _ d1Expected: [any OperationI } class EventCollector { + private let queue = DispatchQueue(label: "com.yorkie.eventcollector", attributes: .concurrent) + private var _values: [T] = [] + let doc: Document - var values: [T] = [] + var values: [T] { + self.queue.sync { self._values } + } + var count: Int { return self.values.count } @@ -166,7 +172,9 @@ class EventCollector { } func add(event: T) { - self.values.append(event) + self.queue.async(flags: .barrier) { + self._values.append(event) + } } func asyncStream() -> AsyncStream { @@ -180,7 +188,8 @@ class EventCollector { func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async { if nth > self.values.count { - XCTFail("Expected \(nth)th value: \(targetValue), but the stream ended before reaching the value") + XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values") + return } var counter = 0 @@ -188,11 +197,12 @@ class EventCollector { counter += 1 if counter == nth { - print("\(nth)th, value: \(value)") XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)") - break + return } } + + XCTFail("Stream ended before finding \(nth)th value") } func subscribeDocumentStatus() async where T == DocumentStatus {