Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribeStatus(callback) function to Document #190

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ public class Client {
return
}

for item in self.attachmentMap {
try self.stopWatchLoop(item.key)
}

let deactivateRequest = DeactivateClientRequest.with { $0.clientID = clientID }

let deactivateResponse = await self.rpcClient.deactivateClient(request: deactivateRequest)
Expand All @@ -219,6 +215,11 @@ public class Client {
throw YorkieError.rpcError(message: deactivateResponse.error.debugDescription)
}

for (key, attachment) in self.attachmentMap {
attachment.doc.applyStatus(.detached)
try self.detachInternal(key)
}

self.status = .deactivated

Logger.info("Client(\(self.key) deactivated.")
Expand Down Expand Up @@ -336,9 +337,7 @@ public class Client {
doc.applyStatus(.detached)
}

try self.stopWatchLoop(doc.getKey())

self.attachmentMap.removeValue(forKey: doc.getKey())
try self.detachInternal(doc.getKey())

Logger.info("[DD] c:\"\(self.key)\" detaches d:\"\(doc.getKey())\"")

Expand Down Expand Up @@ -683,6 +682,16 @@ public class Client {
}
}

private func detachInternal(_ docKey: DocumentKey) throws {
guard self.attachmentMap[docKey] != nil else {
return
}

try self.stopWatchLoop(docKey)

self.attachmentMap.removeValue(forKey: docKey)
}

@discardableResult
private func syncInternal(_ attachment: Attachment, _ syncMode: SyncMode) async throws -> Document {
guard let clientID = self.id else {
Expand Down
11 changes: 11 additions & 0 deletions Sources/Document/Document.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class Document {
private var subscribeCallbacks = [String: SubscribeCallback]()
private var presenceSubscribeCallback = [String: SubscribeCallback]()
private var connectionSubscribeCallback: SubscribeCallback?
private var statusSubscribeCallback: SubscribeCallback?
private var syncSubscribeCallback: SubscribeCallback?

/**
Expand Down Expand Up @@ -194,6 +195,14 @@ public class Document {
self.connectionSubscribeCallback = callback
}

/**
* `subscribeStatus` registers a callback to subscribe to events on the document.
* The callback will be called when the document status changes.
*/
public func subscribeStatus(_ callback: @escaping SubscribeCallback) {
self.statusSubscribeCallback = callback
}

/**
* `subscribePresence` registers a callback to subscribe to events on the document.
* The callback will be called when the targetPath or any of its nested values change.
Expand Down Expand Up @@ -639,6 +648,8 @@ public class Document {
}
} else if event.type == .connectionChanged {
self.connectionSubscribeCallback?(event, self)
} else if event.type == .statusChanged {
self.statusSubscribeCallback?(event, self)
} else if event.type == .syncStatusChanged {
self.syncSubscribeCallback?(event, self)
} else if event.type == .snapshot {
Expand Down
104 changes: 104 additions & 0 deletions Tests/Integration/DocumentIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -470,4 +470,108 @@ final class DocumentIntegrationTests: XCTestCase {
try await self.c1.deactivate()
try await self.c2.deactivate()
}

func test_subscribe_document_status_changed_event() async throws {
let c1 = Client(rpcAddress)
let c2 = Client(rpcAddress)
try await c1.activate()
try await c2.activate()

let docKey = "\(self.description)-\(Date().description)".toDocKey
let d1 = Document(key: docKey)
let d2 = Document(key: docKey)

let eventCollectorD1 = EventCollector<DocumentStatus>(doc: d1)
let eventCollectorD2 = EventCollector<DocumentStatus>(doc: d2)
await eventCollectorD1.subscribeDocumentStatus()
await eventCollectorD2.subscribeDocumentStatus()

// 1. When the client attaches a document, it receives an attached event.
try await c1.attach(d1)
try await c2.attach(d2)

// 2. When c1 detaches a document, it receives a detached event.
try await c1.detach(d1)

// 3. When c2 deactivates, it should also receive a detached event.
try await c2.deactivate()

await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .detached)

await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached)

// 4. When other document is attached, it receives an attached event.
let docKey2 = "\(self.description)-\(Date().description)".toDocKey
let d3 = Document(key: docKey2)
let d4 = Document(key: docKey2)
let eventCollectorD3 = EventCollector<DocumentStatus>(doc: d3)
let eventCollectorD4 = EventCollector<DocumentStatus>(doc: d4)
await eventCollectorD3.subscribeDocumentStatus()
await eventCollectorD4.subscribeDocumentStatus()

try await c1.attach(d3, [:], .manual)

try await c2.activate()
try await c2.attach(d4, [:], .manual)

// 5. When c1 removes a document, it receives a removed event.
try await c1.remove(d3)

// 6. When c2 syncs, it should also receive a removed event.
try await c2.sync()

await eventCollectorD3.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD3.verifyNthValue(at: 2, isEqualTo: .removed)

await eventCollectorD4.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD4.verifyNthValue(at: 2, isEqualTo: .removed)

// 7. If the document is in the removed state, a detached event should not occur when deactivating.
let eventCount3 = eventCollectorD3.count
let eventCount4 = eventCollectorD4.count
try await c1.deactivate()
try await c2.deactivate()

try await Task.sleep(nanoseconds: 500_000_000)

XCTAssertEqual(eventCount3, eventCollectorD3.count)
XCTAssertEqual(eventCount4, eventCollectorD4.count)
}

func test_document_status_changes_to_detached_when_deactivating() async throws {
let c1 = Client(rpcAddress)
let c2 = Client(rpcAddress)
try await c1.activate()
try await c2.activate()

let docKey = "\(self.description)-\(Date().description)".toDocKey
let d1 = Document(key: docKey)
let d2 = Document(key: docKey)

let eventCollectorD1 = EventCollector<DocumentStatus>(doc: d1)
let eventCollectorD2 = EventCollector<DocumentStatus>(doc: d2)
await eventCollectorD1.subscribeDocumentStatus()
await eventCollectorD2.subscribeDocumentStatus()

// 1. When the client attaches a document, it receives an attached event.
try await c1.attach(d1, [:], .manual)
try await c2.attach(d2, [:], .manual)

await eventCollectorD1.verifyNthValue(at: 1, isEqualTo: .attached)
await eventCollectorD2.verifyNthValue(at: 1, isEqualTo: .attached)

// 2. When c1 removes a document, it receives a removed event.
try await c1.remove(d1)
await eventCollectorD1.verifyNthValue(at: 2, isEqualTo: .removed)

// 3. When c2 deactivates, it should also receive a removed event.
try await c2.deactivate()
// NOTE: For now, document status changes to `Detached` when deactivating.
// This behavior may change in the future.
await eventCollectorD2.verifyNthValue(at: 2, isEqualTo: .detached)

try await c1.deactivate()
}
}
61 changes: 61 additions & 0 deletions Tests/Integration/IntegrationHelper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,64 @@ func subscribeDocs(_ d1: Document, _ d2: Document, _ d1Expected: [any OperationI
}
}
}

class EventCollector<T: Equatable> {
private let queue = DispatchQueue(label: "com.yorkie.eventcollector", attributes: .concurrent)
private var _values: [T] = []

let doc: Document
var values: [T] {
self.queue.sync { self._values }
}

var count: Int {
return self.values.count
}

init(doc: Document) {
self.doc = doc
}

func add(event: T) {
self.queue.async(flags: .barrier) {
self._values.append(event)
}
}

func asyncStream() -> AsyncStream<T> {
return AsyncStream<T> { continuation in
for value in self.values {
continuation.yield(value)
}
continuation.finish()
}
}
Comment on lines +180 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing AsyncStream to support real-time updates.

The current implementation creates a one-shot stream that only yields existing values. Consider enhancing it to support real-time updates by maintaining a continuation that yields new values as they arrive.

 func asyncStream() -> AsyncStream<T> {
-    return AsyncStream<T> { continuation in
+    return AsyncStream<T> { [weak self] continuation in
+        guard let self = self else { return }
         for value in self.values {
             continuation.yield(value)
         }
-        continuation.finish()
+        // Store continuation to yield future values
+        self.continuation = continuation
     }
 }

Committable suggestion was skipped due to low confidence.


func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async {
if nth > self.values.count {
XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values")
return
}

var counter = 0
for await value in self.asyncStream() {
counter += 1

if counter == nth {
XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)")
return
}
}

XCTFail("Stream ended before finding \(nth)th value")
}
Comment on lines +189 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve boundary check for better error handling.

The boundary check should use >= instead of > to handle the exact nth case consistently.

-    if nth > self.values.count {
+    if nth >= self.values.count {
         XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values")
         return
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async {
if nth > self.values.count {
XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values")
return
}
var counter = 0
for await value in self.asyncStream() {
counter += 1
if counter == nth {
XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)")
return
}
}
XCTFail("Stream ended before finding \(nth)th value")
}
func verifyNthValue(at nth: Int, isEqualTo targetValue: T) async {
if nth >= self.values.count {
XCTFail("Expected \(nth)th value: \(targetValue), but only received \(self.values.count) values")
return
}
var counter = 0
for await value in self.asyncStream() {
counter += 1
if counter == nth {
XCTAssertTrue(value == targetValue, "Expected \(nth)th value: \(targetValue), actual value: \(value)")
return
}
}
XCTFail("Stream ended before finding \(nth)th value")
}


func subscribeDocumentStatus() async where T == DocumentStatus {
await self.doc.subscribeStatus { [weak self] event, _ in
guard let status = (event as? StatusChangedEvent)?.value.status else {
return
}
self?.add(event: status)
}
}
Comment on lines +208 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for subscription failures.

Consider handling potential errors from the subscription to prevent silent failures in tests.

-    func subscribeDocumentStatus() async where T == DocumentStatus {
+    func subscribeDocumentStatus() async throws where T == DocumentStatus {
-        await self.doc.subscribeStatus { [weak self] event, _ in
+        try await self.doc.subscribeStatus { [weak self] event, error in
+            if let error = error {
+                XCTFail("Status subscription failed: \(error)")
+                return
+            }
             guard let status = (event as? StatusChangedEvent)?.value.status else {
                 return
             }
             self?.add(event: status)
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func subscribeDocumentStatus() async where T == DocumentStatus {
await self.doc.subscribeStatus { [weak self] event, _ in
guard let status = (event as? StatusChangedEvent)?.value.status else {
return
}
self?.add(event: status)
}
}
func subscribeDocumentStatus() async throws where T == DocumentStatus {
try await self.doc.subscribeStatus { [weak self] event, error in
if let error = error {
XCTFail("Status subscription failed: \(error)")
return
}
guard let status = (event as? StatusChangedEvent)?.value.status else {
return
}
self?.add(event: status)
}
}

}
Loading