Skip to content

Commit

Permalink
feat: HTTP REST API: Filter support v2 (#1890)
Browse files Browse the repository at this point in the history
Filter v2 rest api support implemented 
Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface
Fix code and tests of v2 Filter rest api
Filter v2 message push test added
Applied autoshard to Filter V2
Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes
Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests
Changed Filter v2 push handler subscription to simple register
Separate node's filterUnsubscribe and filterUnsubscribeAll
  • Loading branch information
NagyZoltanPeter authored Sep 14, 2023
1 parent 9085b1b commit dac072f
Show file tree
Hide file tree
Showing 32 changed files with 1,895 additions and 366 deletions.
18 changes: 12 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick

elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil():
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)
let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get())

echo "quitting..."

Expand Down Expand Up @@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if peerInfo.isOk():
await node.mountFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)

proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic),
contentTopics=chat.contentTopic,
filterHandler,
peerInfo.value)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else:
error "Filter not mounted. Couldn't parse conf.filternode",
error = peerInfo.error
Expand All @@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
await node.subscribe(some(topic), @[ContentTopic("")], handler)
node.subscribe(topic, handler)

if conf.rlnRelay:
info "WakuRLNRelay is enabled"
Expand Down
4 changes: 3 additions & 1 deletion apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
# Chat 2 imports
../chat2/chat2,
Expand Down Expand Up @@ -297,7 +298,8 @@ when isMainModule:
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else:
error "Error parsing conf.filternode", error = filterPeer.error

Expand Down
15 changes: 11 additions & 4 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
Expand All @@ -46,6 +48,7 @@ import
../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
Expand Down Expand Up @@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await mountFilterClient(node)
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)

Expand Down Expand Up @@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)

## Store REST API
installStoreApiHandlers(server.router, app.node)
Expand Down
8 changes: 6 additions & 2 deletions examples/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient,
else:
notice "unsubscribe request successful"

proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage)
{.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp


proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) =
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)
wfc = WakuFilterClient.new(pm, rng)

# Mount filter client protocol
switch.mount(wfc)

wfc.registerPushHandler(messagePushHandler)

# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

Expand Down
8 changes: 5 additions & 3 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ import
./test_waku_lightpush,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter,
./test_wakunode_filter,
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_waku_peer_exchange,
./test_peer_store_extended,
./test_message_cache,
Expand Down Expand Up @@ -95,7 +95,9 @@ import
./wakunode_rest/test_rest_relay,
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter

import
./waku_rln_relay/test_waku_rln_relay,
Expand Down
56 changes: 28 additions & 28 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountFilter()))

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check connection
check:
conn.isSome()
Expand Down Expand Up @@ -81,12 +81,12 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value

# Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
check:
conn1.isNone()

# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
check:
conn2.isNone()

Expand All @@ -109,14 +109,14 @@ procSuite "Peer Manager":
node.mountStoreClient()

node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)

# Check peers were successfully added to peer manager
check:
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterCodec))
it.protocols.contains(WakuLegacyFilterCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec))
Expand Down Expand Up @@ -429,7 +429,7 @@ procSuite "Peer Manager":

# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuFilterCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)

Expand All @@ -449,7 +449,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId

Expand All @@ -474,38 +474,38 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2

nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2

nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1

nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0

nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
Expand All @@ -521,17 +521,17 @@ procSuite "Peer Manager":
require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)

nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
Expand All @@ -552,7 +552,7 @@ procSuite "Peer Manager":

# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]

# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
Expand All @@ -561,7 +561,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId

# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
Expand Down Expand Up @@ -757,7 +757,7 @@ procSuite "Peer Manager":
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# they are also prunned 
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# we should have 4 peers (2in/2out) but due to collocation limit
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
5 changes: 2 additions & 3 deletions tests/waku_filter_v2/client_utils.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles

Expand All @@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =

return proto

proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager)
proto = WakuFilterClient.new(peerManager, rng)

await proto.start()
switch.mount(proto)
Expand Down
Loading

0 comments on commit dac072f

Please sign in to comment.