Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: HTTP REST API: Filter support v2 #1890

Merged
merged 3 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick

elif line.startsWith("/exit"):
if not c.node.wakuFilter.isNil():
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."

await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic)
let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get())

echo "quitting..."

Expand Down Expand Up @@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if peerInfo.isOk():
await node.mountFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec)
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)

proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)

await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler)

await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic),
contentTopics=chat.contentTopic,
filterHandler,
peerInfo.value)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
else:
error "Filter not mounted. Couldn't parse conf.filternode",
error = peerInfo.error
Expand All @@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
await node.subscribe(some(topic), @[ContentTopic("")], handler)
node.subscribe(topic, handler)

if conf.rlnRelay:
info "WakuRLNRelay is enabled"
Expand Down
4 changes: 3 additions & 1 deletion apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
# Chat 2 imports
../chat2/chat2,
Expand Down Expand Up @@ -297,7 +298,8 @@ when isMainModule:
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec)
else:
error "Error parsing conf.filternode", error = filterPeer.error

Expand Down
15 changes: 11 additions & 4 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
Expand All @@ -46,6 +48,7 @@ import
../../waku/node/rest/debug/handlers as rest_debug_api,
../../waku/node/rest/relay/handlers as rest_relay_api,
../../waku/node/rest/relay/topic_cache,
../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/node/rest/filter/handlers as rest_filter_api,
../../waku/node/rest/store/handlers as rest_store_api,
../../waku/node/rest/health/handlers as rest_health_api,
Expand Down Expand Up @@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await mountFilterClient(node)
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec)
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)

Expand Down Expand Up @@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
let legacyFilterCache = rest_legacy_filter_api.MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache)

let filterCache = rest_filter_api.MessageCache.init()
rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache)

## Store REST API
installStoreApiHandlers(server.router, app.node)
Expand Down
8 changes: 6 additions & 2 deletions examples/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient,
else:
notice "unsubscribe request successful"

proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage)
{.async, gcsafe.} =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp


proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
Expand Down Expand Up @@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) =
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)
wfc = WakuFilterClient.new(pm, rng)

# Mount filter client protocol
switch.mount(wfc)

wfc.registerPushHandler(messagePushHandler)

# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

Expand Down
8 changes: 5 additions & 3 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ import
./test_waku_lightpush,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter,
./test_wakunode_filter,
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_waku_peer_exchange,
./test_peer_store_extended,
./test_message_cache,
Expand Down Expand Up @@ -95,7 +95,9 @@ import
./wakunode_rest/test_rest_relay,
./wakunode_rest/test_rest_relay_serdes,
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter

import
./waku_rln_relay/test_waku_rln_relay,
Expand Down
56 changes: 28 additions & 28 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.mountFilter()))

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
# Check connection
check:
conn.isSome()
Expand Down Expand Up @@ -81,12 +81,12 @@ procSuite "Peer Manager":
let nonExistentPeer = nonExistentPeerRes.value

# Dial non-existent peer from node1
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec)
let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
check:
conn1.isNone()

# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
check:
conn2.isNone()

Expand All @@ -109,14 +109,14 @@ procSuite "Peer Manager":
node.mountStoreClient()

node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec)
node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec)

# Check peers were successfully added to peer manager
check:
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protocols.contains(WakuFilterCodec))
it.protocols.contains(WakuLegacyFilterCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protocols.contains(WakuStoreCodec))
Expand Down Expand Up @@ -429,7 +429,7 @@ procSuite "Peer Manager":

# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuFilterCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)

Expand All @@ -449,7 +449,7 @@ procSuite "Peer Manager":
# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId

Expand All @@ -474,38 +474,38 @@ procSuite "Peer Manager":
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2

nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2

nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1

nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0

nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
Expand All @@ -521,17 +521,17 @@ procSuite "Peer Manager":
require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)

nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
Expand All @@ -552,7 +552,7 @@ procSuite "Peer Manager":

# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec]
pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]

# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
Expand All @@ -561,7 +561,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId

# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuFilterCodec)
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
Expand Down Expand Up @@ -757,7 +757,7 @@ procSuite "Peer Manager":
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# they are also prunned 
# they are also prunned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# we should have 4 peers (2in/2out) but due to collocation limit
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
5 changes: 2 additions & 3 deletions tests/waku_filter_v2/client_utils.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import
std/[options,tables],
testutils/unittests,
chronos,
chronicles

Expand All @@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =

return proto

proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(rng, messagePushHandler, peerManager)
proto = WakuFilterClient.new(peerManager, rng)

await proto.start()
switch.mount(proto)
Expand Down
Loading