Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ENR can now be updated #1875

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,14 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,

proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if app.wakuDiscv5.isSome():
let res = app.wakuDiscv5.get().start()
let wakuDiscv5 = app.wakuDiscv5.get()

let res = wakuDiscv5.start()
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)

return await startNode(
app.node,
Expand Down
82 changes: 78 additions & 4 deletions tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import
stew/results,
stew/shims/net,
chronos,
chronicles,
testutils/unittests,
libp2p/crypto/crypto as libp2p_keys,
eth/keys as eth_keys
import
../../waku/waku_core/topics,
../../waku/waku_enr,
../../waku/waku_discv5,
./testlib/common,
Expand Down Expand Up @@ -282,7 +284,7 @@ procSuite "Waku Discovery v5":
let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"]
let empty: seq[string] = @[]

let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)])
let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid Shards")

## When

Expand Down Expand Up @@ -314,7 +316,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 21
shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -332,7 +334,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -350,7 +352,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -377,4 +379,76 @@ procSuite "Waku Discovery v5":
predicateCluster22(recordCluster22Indices1) == true
predicateCluster22(recordCluster22Indices2) == false

asyncTest "update ENR from subscriptions":
## Given
let
shard1 = "/waku/2/rs/0/1"
shard2 = "/waku/2/rs/0/2"
shard3 = "/waku/2/rs/0/3"
privKey = generateSecp256k1Key()
bindIp = "0.0.0.0"
extIp = "127.0.0.1"
tcpPort = 61500u16
udpPort = 9000u16

let record = newTestEnrRecord(
privKey = privKey,
extIp = extIp,
tcpPort = tcpPort,
udpPort = udpPort,
)

let node = newTestDiscv5(
privKey = privKey,
bindIp = bindIp,
tcpPort = tcpPort,
udpPort = udpPort,
record = record
)

let res = node.start()
assert res.isOk(), res.error

let queue = newAsyncEventQueue[SubscriptionEvent](0)

## When
asyncSpawn node.subscriptionsListener(queue)

## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == true
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == true
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == false
node.protocol.localNode.record.containsShard(shard2) == false
node.protocol.localNode.record.containsShard(shard3) == false

## Cleanup
await node.stop()


24 changes: 14 additions & 10 deletions tests/test_waku_enr.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ suite "Waku ENR - Relay static sharding":
shardIndex: uint16 = 1024

## When
expect Defect:
discard RelayShards.init(shardCluster, shardIndex)
let res = RelayShards.init(shardCluster, shardIndex)

## Then
assert res.isErr(), $res.get()

test "new relay shards field with single invalid index in list":
## Given
Expand All @@ -272,8 +274,10 @@ suite "Waku ENR - Relay static sharding":
shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16, 1024u16]

## When
expect Defect:
discard RelayShards.init(shardCluster, shardIndices)
let res = RelayShards.init(shardCluster, shardIndices)

## Then
assert res.isErr(), $res.get()

test "new relay shards field with single valid index":
## Given
Expand All @@ -284,7 +288,7 @@ suite "Waku ENR - Relay static sharding":
let topic = NsPubsubTopic.staticSharding(shardCluster, shardIndex)

## When
let shards = RelayShards.init(shardCluster, shardIndex)
let shards = RelayShards.init(shardCluster, shardIndex).expect("Valid Shards")

## Then
check:
Expand All @@ -310,7 +314,7 @@ suite "Waku ENR - Relay static sharding":
shardIndices: seq[uint16] = @[1u16, 2u16, 2u16, 3u16, 3u16, 3u16]

## When
let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

## Then
check:
Expand Down Expand Up @@ -344,7 +348,7 @@ suite "Waku ENR - Relay static sharding":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

## When
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
Expand All @@ -370,7 +374,7 @@ suite "Waku ENR - Relay static sharding":
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()

let shards = RelayShards.init(33, toSeq(0u16 ..< 64u16))
let shards = RelayShards.init(33, toSeq(0u16 ..< 64u16)).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand Down Expand Up @@ -398,8 +402,8 @@ suite "Waku ENR - Relay static sharding":
enrPrivKey = generatesecp256k1key()

let
shardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16])
shardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16])
shardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]).expect("Valid Shards")
shardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16]).expect("Valid Shards")


var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
Expand Down
9 changes: 5 additions & 4 deletions waku/node/jsonrpc/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "post_waku_v2_relay_v1_subscriptions"

# Subscribe to all requested topics
for topic in topics:
if cache.isSubscribed(topic):
continue
let newTopics = topics.filterIt(not cache.isSubscribed(it))

for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, topicHandler)

Expand All @@ -70,7 +69,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "delete_waku_v2_relay_v1_subscriptions"

# Unsubscribe all handlers from requested topics
for topic in topics:
let subscribedTopics = topics.filterIt(cache.isSubscribed(it))

for topic in subscribedTopics:
node.unsubscribe(topic)
cache.unsubscribe(topic)

Expand Down
15 changes: 7 additions & 8 deletions waku/node/rest/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN

let req: RelayPostSubscriptionsRequest = reqResult.get()

for topic in req:
if cache.isSubscribed(string(topic)):
# Only subscribe to topics for which we have no subscribed topic handlers yet
continue
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))

cache.subscribe(string(topic))
node.subscribe(string(topic), cache.messageHandler())
for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, cache.messageHandler())

return RestApiResponse.ok()

Expand All @@ -88,8 +87,8 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak

# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribe(string(topic))
cache.unsubscribe(string(topic))
node.unsubscribe(topic)
cache.unsubscribe(topic)

# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
Expand Down
7 changes: 7 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type
rendezvous*: RendezVous
announcedAddresses* : seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]

proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
Expand Down Expand Up @@ -141,12 +142,15 @@ proc new*(T: type WakuNode,

info "Initializing networking", addrs= $netConfig.announcedAddresses

let queue = newAsyncEventQueue[SubscriptionEvent](30)

return WakuNode(
peerManager: peerManager,
switch: switch,
rng: rng,
enr: enr,
announcedAddresses: netConfig.announcedAddresses,
topicSubscriptionQueue: queue
)

proc peerInfo*(node: WakuNode): PeerInfo =
Expand Down Expand Up @@ -229,6 +233,7 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic) =

debug "subscribe", pubsubTopic= topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)

proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
Expand All @@ -240,6 +245,7 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =

debug "subscribe", pubsubTopic= topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)
node.wakuRelay.subscribe(topic, handler)

Expand All @@ -252,6 +258,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic) =

info "unsubscribe", pubsubTopic=topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: topic))
node.wakuRelay.unsubscribe(topic)


Expand Down
9 changes: 9 additions & 0 deletions waku/waku_core/topics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,12 @@ export
content_topic,
pubsub_topic,
sharding

type
SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub
SubscriptionEvent* = object
case kind*: SubscriptionKind
of PubsubSub: pubsubSub*: string
of ContentSub: contentSub*: string
of PubsubUnsub: pubsubUnsub*: string
of ContentUnsub: contentUnsub*: string
Loading