Skip to content

Commit

Permalink
Filter v2 rest api support implemented - WIP: some test fixes still n…
Browse files Browse the repository at this point in the history
…eeded

Filter rest api documentation updated with v1 and v2 interface support.
Separated legacy filter rest interface

Small enhancment on Makefile. 'make list' will list all targets defined
  • Loading branch information
NagyZoltanPeter authored and NagyZoltanPeter committed Aug 23, 2023
1 parent 08ff667 commit 80158ee
Show file tree
Hide file tree
Showing 17 changed files with 1,248 additions and 241 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,12 @@ release-notes:
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101


#############################
# List all possible Targets #
#############################

.PHONY: list
list:
@LC_ALL=C $(MAKE) -pRrq -f $(firstword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/(^|\n)# Files(\n|$$)/,/(^|\n)# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | grep -E -v -e '^[^[:alnum:]]' -e '^$@$$'
15 changes: 12 additions & 3 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import
../../waku/v2/waku_store,
../../waku/v2/waku_lightpush,
../../waku/v2/waku_filter,
../../waku/v2/waku_filter_v2,
../../waku/v2/waku_filter_v2/client as waku_filter_client,
./wakunode2_validator_signed,
./internal_config,
./external_config
Expand All @@ -45,6 +47,7 @@ import
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
../../waku/v2/node/rest/relay/topic_cache,
../../waku/v2/node/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/v2/node/rest/filter/handlers as rest_filter_api,
../../waku/v2/node/rest/store/handlers as rest_store_api,
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
Expand Down Expand Up @@ -467,7 +470,12 @@ proc setupProtocols(node: WakuNode,
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await mountFilterClient(node)
let filterMessageCache = TopicCache.init(capacity=rest_legacy_filter_api.filterMessageCacheDefaultCapacity)

let filterPushMessageHandler : waku_filter_client.MessagePushHandler = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
filterMessageCache.addMessage(message.contentTopic, message)

await node.mountFilterClient(some(filterPushMessageHandler))
node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec)
else:
return err("failed to set node waku filter peer: " & filterNode.error)
Expand Down Expand Up @@ -569,8 +577,9 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
let filterCache = rest_legacy_filter_api.MessageCache.init(capacity=rest_legacy_filter_api.filterMessageCacheDefaultCapacity)
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, filterCache)
rest_filter_api.installFilterRestApiHandlers(server.router, app.node)

## Store REST API
installStoreApiHandlers(server.router, app.node)
Expand Down
2 changes: 1 addition & 1 deletion tests/v2/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ suite "WakuNode - Filter":
filterPushHandlerFut.complete((pubsubTopic, msg))

## When
await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)
await client.legacyFilterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo)

# Wait for subscription to take effect
waitFor sleepAsync(100.millis)
Expand Down
2 changes: 1 addition & 1 deletion tests/v2/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ procSuite "WakuNode - Store":
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} =
filterFut.complete((pubsubTopic, msg))

waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)
waitFor server.legacyFilterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer)

waitFor sleepAsync(100.millis)

Expand Down
97 changes: 32 additions & 65 deletions tests/v2/wakunode_rest/test_rest_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,31 @@ proc testWakuNode(): WakuNode =


type RestFilterTest = object
node1: WakuNode
node2: WakuNode
serviceNode: WakuNode
subscriberNode: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef


proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.node1 = testWakuNode()
result.node2 = testWakuNode()
result.serviceNode = testWakuNode()
result.subscriberNode = testWakuNode()

await allFutures(result.node1.start(), result.node2.start())
await allFutures(result.serviceNode.start(), result.subscriberNode.start())

await result.node1.mountFilter()
await result.node2.mountFilterClient()
await result.serviceNode.mountFilter()
await result.subscriberNode.mountFilterClient()

result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
result.subscriberNode.peerManager.addServicePeer(result.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterCodec)

let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()

result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity)
result.messageCache = result.subscriberNode.getFilterV2PushMessageCache().get()

installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterRestApiHandlers(result.restServer.router, result.subscriberNode)

result.restServer.start()

Expand All @@ -74,11 +72,11 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} =
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.node1.stop(), self.node2.stop())
await allFutures(self.serviceNode.stop(), self.subscriberNode.stop())


suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
suite "Waku v2 Rest API - Filter V2":
asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()

Expand All @@ -89,15 +87,18 @@ suite "Waku v2 Rest API - Filter":
,ContentTopic("4")
]

let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
let requestBody = FilterSubscriptionsRequest(requestId: "1234",
contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)

echo "response", $response

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "1234"

check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
Expand All @@ -106,19 +107,20 @@ suite "Waku v2 Rest API - Filter":
restFilterTest.messageCache.isSubscribed("4")

# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
let badRequestBody = FilterSubscriptionsRequest(requestId: "4567", contentFilters: @[], pubsubTopic: "")
let badResponse = await restFilterTest.client.filterPostSubscriptions(badRequestBody)

check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
$badResponse.contentType == $MIMETYPE_JSON
badResponse.data.requestId == "4567"
badResponse.data.statusDesc == "Failed to decode request"


await restFilterTest.shutdown()


asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
Expand All @@ -136,15 +138,16 @@ suite "Waku v2 Rest API - Filter":
]

# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
let requestBody = FilterUnsubscribeRequest(requestId: "4321",
contentFilters: some(contentFilters),
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptions(requestBody)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "4321"

check:
not restFilterTest.messageCache.isSubscribed("1")
Expand All @@ -153,39 +156,3 @@ suite "Waku v2 Rest API - Filter":
restFilterTest.messageCache.isSubscribed("4")

await restFilterTest.shutdown()


asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given

let
restFilterTest = await setupRestFilter()

let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )

let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]

restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)

# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)

await restFilterTest.shutdown()
Loading

0 comments on commit 80158ee

Please sign in to comment.