Skip to content

Commit

Permalink
Merge 880d018 into 9085b1b
Browse files Browse the repository at this point in the history
  • Loading branch information
NagyZoltanPeter authored Sep 14, 2023
2 parents 9085b1b + 880d018 commit dbdc197
Show file tree
Hide file tree
Showing 32 changed files with 1,895 additions and 366 deletions.
18 changes: 12 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick

elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil():
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)
let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get())

echo "quitting..."

Expand Down Expand Up @@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if peerInfo.isOk():
await node.mountFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)

proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic),
contentTopics=chat.contentTopic,
filterHandler,
peerInfo.value)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else:
error "Filter not mounted. Couldn't parse conf.filternode",
error = peerInfo.error
Expand All @@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
await node.subscribe(some(topic), @[ContentTopic("")], handler)
node.subscribe(topic, handler)

if conf.rlnRelay:
info "WakuRLNRelay is enabled"
Expand Down
4 changes: 3 additions & 1 deletion apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
# Chat 2 imports
../chat2/chat2,
Expand Down Expand Up @@ -297,7 +298,8 @@ when isMainModule:
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else:
error "Error parsing conf.filternode", error = filterPeer.error

Expand Down
15 changes: 11 additions & 4 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
Expand All @@ -46,6 +48,7 @@ import
../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
Expand Down Expand Up @@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await mountFilterClient(node)
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)

Expand Down Expand Up @@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)

## Store REST API
installStoreApiHandlers(server.router, app.node)
Expand Down
8 changes: 6 additions & 2 deletions examples/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient,
else:
notice "unsubscribe request successful"

proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage)
{.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp


proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) =
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)
wfc = WakuFilterClient.new(pm, rng)

# Mount filter client protocol
switch.mount(wfc)

wfc.registerPushHandler(messagePushHandler)

# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

Expand Down
8 changes: 5 additions & 3 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ import
./test_waku_lightpush,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter,
./test_wakunode_filter,
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_waku_peer_exchange,
./test_peer_store_extended,
./test_message_cache,
Expand Down Expand Up @@ -95,7 +95,9 @@ import
./wakunode_rest/test_rest_relay,
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter

import
./waku_rln_relay/test_waku_rln_relay,
Expand Down
56 changes: 28 additions & 28 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountFilter()))

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check connection
check:
conn.isSome()
Expand Down Expand Up @@ -81,12 +81,12 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value

# Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
check:
conn1.isNone()

# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
check:
conn2.isNone()

Expand All @@ -109,14 +109,14 @@ procSuite "Peer Manager":
node.mountStoreClient()

node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)

# Check peers were successfully added to peer manager
check:
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterCodec))
it.protocols.contains(WakuLegacyFilterCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec))
Expand Down Expand Up @@ -429,7 +429,7 @@ procSuite "Peer Manager":

# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuFilterCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)

Expand All @@ -449,7 +449,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId

Expand All @@ -474,38 +474,38 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2

nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2

nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1

nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0

nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
Expand All @@ -521,17 +521,17 @@ procSuite "Peer Manager":
require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)

nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
Expand All @@ -552,7 +552,7 @@ procSuite "Peer Manager":

# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]

# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
Expand All @@ -561,7 +561,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId

# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
Expand Down Expand Up @@ -757,7 +757,7 @@ procSuite "Peer Manager":
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# they are also prunned 
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# we should have 4 peers (2in/2out) but due to collocation limit
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
5 changes: 2 additions & 3 deletions tests/waku_filter_v2/client_utils.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles

Expand All @@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =

return proto

proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager)
proto = WakuFilterClient.new(peerManager, rng)

await proto.start()
switch.mount(proto)
Expand Down
Loading

0 comments on commit dbdc197

Please sign in to comment.