diff --git a/tests/test_waku_metadata.nim b/tests/test_waku_metadata.nim index 687ef73444..e96edb3034 100644 --- a/tests/test_waku_metadata.nim +++ b/tests/test_waku_metadata.nim @@ -15,6 +15,7 @@ import eth/p2p/discoveryv5/enr import ../../waku/waku_node, + ../../waku/waku_core/topics, ../../waku/node/peer_manager, ../../waku/waku_discv5, ../../waku/waku_metadata, @@ -23,8 +24,6 @@ import procSuite "Waku Metadata Protocol": - - # TODO: Add tests with shards when ready asyncTest "request() returns the supported metadata of the peer": let clusterId = 10.uint32 let @@ -34,6 +33,9 @@ procSuite "Waku Metadata Protocol": # Start nodes await allFutures([node1.start(), node2.start()]) + node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7")) + node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6")) + # Create connection let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec) require: @@ -48,3 +50,5 @@ procSuite "Waku Metadata Protocol": check: response1.get().clusterId.get() == clusterId + response1.get().shards == @[uint32(6), uint32(7)] + \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index fc512b1786..f649813478 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -155,7 +155,7 @@ proc new*(T: type WakuNode, ) # mount metadata protocol - let metadata = WakuMetadata.new(netConfig.clusterId) + let metadata = WakuMetadata.new(netConfig.clusterId, queue) node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec)) node.wakuMetadata = metadata peerManager.wakuMetadata = metadata @@ -1127,6 +1127,8 @@ proc start*(node: WakuNode) {.async.} = node.started = true + node.wakuMetadata.start() + info "Node started successfully" proc stop*(node: WakuNode) {.async.} = diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index a8b5ae227b..eefcf5b829 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[options, sequtils, random], + std/[options, sequtils, random, sets], stew/results, chronicles, chronos, @@ -27,57 +27,56 @@ const RpcResponseMaxBytes* = 1024 type WakuMetadata* = ref object of LPProtocol clusterId*: uint32 - shards*: seq[uint32] + shards*: HashSet[uint32] + topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} = - try: - await conn.writeLP(WakuMetadataResponse( - clusterId: some(m.clusterId), - shards: m.shards - ).encode().buffer) - except CatchableError as exc: - return err(exc.msg) + let response = WakuMetadataResponse( + clusterId: some(m.clusterId), + shards: toSeq(m.shards) + ) + + let res = catch: await conn.writeLP(response.encode().buffer) + if res.isErr(): + return err(res.error.msg) return ok() proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} = - var buffer: seq[byte] - var error: string - try: - await conn.writeLP(WakuMetadataRequest( - clusterId: some(m.clusterId), - shards: m.shards, - ).encode().buffer) - buffer = await conn.readLp(RpcResponseMaxBytes) - except CatchableError as exc: - error = $exc.msg - finally: - # close, no more data is expected - await conn.closeWithEof() + let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards)) + + let writeRes = catch: await conn.writeLP(request.encode().buffer) + let readRes = catch: await conn.readLp(RpcResponseMaxBytes) + + # close no watter what + let closeRes = catch: await conn.closeWithEof() + if closeRes.isErr(): + return err("close failed: " & closeRes.error.msg) - if error.len > 0: - return err("write/read failed: " & error) + if writeRes.isErr(): + return err("write failed: " & writeRes.error.msg) - let decodedBuff = WakuMetadataResponse.decode(buffer) - if decodedBuff.isErr(): - return err("decode failed: " & $decodedBuff.error) + let buffer = + if readRes.isErr(): + return err("read failed: " & readRes.error.msg) + else: readRes.get() - echo decodedBuff.get().clusterId - return ok(decodedBuff.get()) + let response = WakuMetadataResponse.decode(buffer).valueOr: + return err("decode failed: " & $error) + + return ok(response) proc initProtocolHandler*(m: WakuMetadata) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var buffer: seq[byte] - try: - buffer = await conn.readLp(RpcResponseMaxBytes) - except CatchableError as exc: + let res = catch: await conn.readLp(RpcResponseMaxBytes) + let buffer = res.valueOr: + error "Connection reading error", error=error.msg return - let decBuf = WakuMetadataResponse.decode(buffer) - if decBuf.isErr(): + let response = WakuMetadataResponse.decode(buffer).valueOr: + error "Response decoding error", error=error return - let response = decBuf.get() debug "Received WakuMetadata request", remoteClusterId=response.clusterId, remoteShards=response.shards, @@ -92,12 +91,50 @@ proc initProtocolHandler*(m: WakuMetadata) = m.handler = handle m.codec = WakuMetadataCodec -proc new*(T: type WakuMetadata, clusterId: uint32): T = - let m = WakuMetadata( - clusterId: clusterId, - # TODO: must be updated real time - shards: @[], - ) - m.initProtocolHandler() +proc new*(T: type WakuMetadata, + clusterId: uint32, + queue: AsyncEventQueue[SubscriptionEvent], + ): T = + let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue) + + wm.initProtocolHandler() + info "Created WakuMetadata protocol", clusterId=clusterId - return m + + return wm + +proc subscriptionsListener(wm: WakuMetadata) {.async.} = + ## Listen for pubsub topics subscriptions changes + + let key = wm.topicSubscriptionQueue.register() + + while wm.started: + let events = await wm.topicSubscriptionQueue.waitEvents(key) + + for event in events: + let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr: + continue + + if parsedTopic.kind != NsPubsubTopicKind.StaticSharding: + continue + + if parsedTopic.clusterId != wm.clusterId: + continue + + case event.kind: + of PubsubSub: + wm.shards.incl(parsedTopic.shardId) + of PubsubUnsub: + wm.shards.excl(parsedTopic.shardId) + else: + continue + + wm.topicSubscriptionQueue.unregister(key) + +proc start*(wm: WakuMetadata) = + wm.started = true + + asyncSpawn wm.subscriptionsListener() + +proc stop*(wm: WakuMetadata) = + wm.started = false \ No newline at end of file