Skip to content

Commit

Permalink
feat: metadata protocol shard subscription (#2149)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Oct 30, 2023
1 parent 250e8b9 commit bcf8e96
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 47 deletions.
8 changes: 6 additions & 2 deletions tests/test_waku_metadata.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/waku_core/topics,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_metadata,
Expand All @@ -23,8 +24,6 @@ import


procSuite "Waku Metadata Protocol":

# TODO: Add tests with shards when ready
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let
Expand All @@ -34,6 +33,9 @@ procSuite "Waku Metadata Protocol":
# Start nodes
await allFutures([node1.start(), node2.start()])

node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7"))
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6"))

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec)
require:
Expand All @@ -48,3 +50,5 @@ procSuite "Waku Metadata Protocol":

check:
response1.get().clusterId.get() == clusterId
response1.get().shards == @[uint32(6), uint32(7)]

4 changes: 3 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ proc new*(T: type WakuNode,
)

# mount metadata protocol
let metadata = WakuMetadata.new(netConfig.clusterId)
let metadata = WakuMetadata.new(netConfig.clusterId, queue)
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
node.wakuMetadata = metadata
peerManager.wakuMetadata = metadata
Expand Down Expand Up @@ -1127,6 +1127,8 @@ proc start*(node: WakuNode) {.async.} =

node.started = true

node.wakuMetadata.start()

info "Node started successfully"

proc stop*(node: WakuNode) {.async.} =
Expand Down
125 changes: 81 additions & 44 deletions waku/waku_metadata/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[options, sequtils, random],
std/[options, sequtils, random, sets],
stew/results,
chronicles,
chronos,
Expand All @@ -27,57 +27,56 @@ const RpcResponseMaxBytes* = 1024
type
WakuMetadata* = ref object of LPProtocol
clusterId*: uint32
shards*: seq[uint32]
shards*: HashSet[uint32]
topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]

proc respond(m: WakuMetadata, conn: Connection): Future[Result[void, string]] {.async, gcsafe.} =
try:
await conn.writeLP(WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: m.shards
).encode().buffer)
except CatchableError as exc:
return err(exc.msg)
let response = WakuMetadataResponse(
clusterId: some(m.clusterId),
shards: toSeq(m.shards)
)

let res = catch: await conn.writeLP(response.encode().buffer)
if res.isErr():
return err(res.error.msg)

return ok()

proc request*(m: WakuMetadata, conn: Connection): Future[Result[WakuMetadataResponse, string]] {.async, gcsafe.} =
var buffer: seq[byte]
var error: string
try:
await conn.writeLP(WakuMetadataRequest(
clusterId: some(m.clusterId),
shards: m.shards,
).encode().buffer)
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()
let request = WakuMetadataRequest(clusterId: some(m.clusterId), shards: toSeq(m.shards))

let writeRes = catch: await conn.writeLP(request.encode().buffer)
let readRes = catch: await conn.readLp(RpcResponseMaxBytes)

# close no watter what
let closeRes = catch: await conn.closeWithEof()
if closeRes.isErr():
return err("close failed: " & closeRes.error.msg)

if error.len > 0:
return err("write/read failed: " & error)
if writeRes.isErr():
return err("write failed: " & writeRes.error.msg)

let decodedBuff = WakuMetadataResponse.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
let buffer =
if readRes.isErr():
return err("read failed: " & readRes.error.msg)
else: readRes.get()

echo decodedBuff.get().clusterId
return ok(decodedBuff.get())
let response = WakuMetadataResponse.decode(buffer).valueOr:
return err("decode failed: " & $error)

return ok(response)

proc initProtocolHandler*(m: WakuMetadata) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(RpcResponseMaxBytes)
except CatchableError as exc:
let res = catch: await conn.readLp(RpcResponseMaxBytes)
let buffer = res.valueOr:
error "Connection reading error", error=error.msg
return

let decBuf = WakuMetadataResponse.decode(buffer)
if decBuf.isErr():
let response = WakuMetadataResponse.decode(buffer).valueOr:
error "Response decoding error", error=error
return

let response = decBuf.get()
debug "Received WakuMetadata request",
remoteClusterId=response.clusterId,
remoteShards=response.shards,
Expand All @@ -92,12 +91,50 @@ proc initProtocolHandler*(m: WakuMetadata) =
m.handler = handle
m.codec = WakuMetadataCodec

proc new*(T: type WakuMetadata, clusterId: uint32): T =
let m = WakuMetadata(
clusterId: clusterId,
# TODO: must be updated real time
shards: @[],
)
m.initProtocolHandler()
proc new*(T: type WakuMetadata,
clusterId: uint32,
queue: AsyncEventQueue[SubscriptionEvent],
): T =
let wm = WakuMetadata(clusterId: clusterId, topicSubscriptionQueue: queue)

wm.initProtocolHandler()

info "Created WakuMetadata protocol", clusterId=clusterId
return m

return wm

proc subscriptionsListener(wm: WakuMetadata) {.async.} =
## Listen for pubsub topics subscriptions changes

let key = wm.topicSubscriptionQueue.register()

while wm.started:
let events = await wm.topicSubscriptionQueue.waitEvents(key)

for event in events:
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
continue

if parsedTopic.kind != NsPubsubTopicKind.StaticSharding:
continue

if parsedTopic.clusterId != wm.clusterId:
continue

case event.kind:
of PubsubSub:
wm.shards.incl(parsedTopic.shardId)
of PubsubUnsub:
wm.shards.excl(parsedTopic.shardId)
else:
continue

wm.topicSubscriptionQueue.unregister(key)

proc start*(wm: WakuMetadata) =
wm.started = true

asyncSpawn wm.subscriptionsListener()

proc stop*(wm: WakuMetadata) =
wm.started = false

0 comments on commit bcf8e96

Please sign in to comment.