From 80158ee506e927caf9200af4c1e5ef3ef4c09109 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter Date: Tue, 1 Aug 2023 15:55:21 +0200 Subject: [PATCH] Filter v2 rest api support implemented - WIP: some test fixes still needed Filter rest api documentation updated with v1 and v2 interface support. Separated legacy filter rest interface Small enhancment on Makefile. 'make list' will list all targets defined --- Makefile | 9 + apps/wakunode2/app.nim | 15 +- tests/v2/test_wakunode_filter.nim | 2 +- tests/v2/waku_store/test_wakunode_store.nim | 2 +- tests/v2/wakunode_rest/test_rest_filter.nim | 97 ++----- .../wakunode_rest/test_rest_legacy_filter.nim | 188 ++++++++++++ waku/v2/node/jsonrpc/filter/handlers.nim | 8 +- waku/v2/node/rest/filter/client.nim | 82 ++++-- waku/v2/node/rest/filter/handlers.nim | 213 ++++++++------ waku/v2/node/rest/filter/legacy_client.nim | 68 +++++ waku/v2/node/rest/filter/legacy_handlers.nim | 160 ++++++++++ waku/v2/node/rest/filter/openapi.yaml | 273 +++++++++++++++++- waku/v2/node/rest/filter/types.nim | 169 ++++++++++- waku/v2/node/waku_node.nim | 165 ++++++++--- waku/v2/waku_core/peers.nim | 7 +- waku/v2/waku_filter_v2/client.nim | 4 +- waku/v2/waku_filter_v2/rpc.nim | 27 ++ 17 files changed, 1248 insertions(+), 241 deletions(-) create mode 100644 tests/v2/wakunode_rest/test_rest_legacy_filter.nim create mode 100644 waku/v2/node/rest/filter/legacy_client.nim create mode 100644 waku/v2/node/rest/filter/legacy_handlers.nim diff --git a/Makefile b/Makefile index 5a7dfe980d..d072a58844 100644 --- a/Makefile +++ b/Makefile @@ -332,3 +332,12 @@ release-notes: sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g' # I could not get the tool to replace issue ids with links, so using sed for now, # asked here: https://github.com/bvieira/sv4git/discussions/101 + + +############################# +# List all possible Targets # +############################# + +.PHONY: list +list: + @LC_ALL=C $(MAKE) -pRrq -f $(firstword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/(^|\n)# Files(\n|$$)/,/(^|\n)# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | grep -E -v -e '^[^[:alnum:]]' -e '^$@$$' \ No newline at end of file diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index d2d37bfaee..cd9fc3389c 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -36,6 +36,8 @@ import ../../waku/v2/waku_store, ../../waku/v2/waku_lightpush, ../../waku/v2/waku_filter, + ../../waku/v2/waku_filter_v2, + ../../waku/v2/waku_filter_v2/client as waku_filter_client, ./wakunode2_validator_signed, ./internal_config, ./external_config @@ -45,6 +47,7 @@ import ../../waku/v2/node/rest/debug/handlers as rest_debug_api, ../../waku/v2/node/rest/relay/handlers as rest_relay_api, ../../waku/v2/node/rest/relay/topic_cache, + ../../waku/v2/node/rest/filter/legacy_handlers as rest_legacy_filter_api, ../../waku/v2/node/rest/filter/handlers as rest_filter_api, ../../waku/v2/node/rest/store/handlers as rest_store_api, ../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api, @@ -467,7 +470,12 @@ proc setupProtocols(node: WakuNode, if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): - await mountFilterClient(node) + let filterMessageCache = TopicCache.init(capacity=rest_legacy_filter_api.filterMessageCacheDefaultCapacity) + + let filterPushMessageHandler : waku_filter_client.MessagePushHandler = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + filterMessageCache.addMessage(message.contentTopic, message) + + await node.mountFilterClient(some(filterPushMessageHandler)) node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) else: return err("failed to set node waku filter peer: " & filterNode.error) @@ -569,8 +577,9 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Filter REST API if conf.filter: - let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity) - installFilterApiHandlers(server.router, app.node, filterCache) + let filterCache = rest_legacy_filter_api.MessageCache.init(capacity=rest_legacy_filter_api.filterMessageCacheDefaultCapacity) + rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, filterCache) + rest_filter_api.installFilterRestApiHandlers(server.router, app.node) ## Store REST API installStoreApiHandlers(server.router, app.node) diff --git a/tests/v2/test_wakunode_filter.nim b/tests/v2/test_wakunode_filter.nim index 9f7f9b11ec..b89e1f5bc0 100644 --- a/tests/v2/test_wakunode_filter.nim +++ b/tests/v2/test_wakunode_filter.nim @@ -43,7 +43,7 @@ suite "WakuNode - Filter": filterPushHandlerFut.complete((pubsubTopic, msg)) ## When - await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo) + await client.legacyFilterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo) # Wait for subscription to take effect waitFor sleepAsync(100.millis) diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index 3360b3c337..5f95c1efcf 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -232,7 +232,7 @@ procSuite "WakuNode - Store": proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterFut.complete((pubsubTopic, msg)) - waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) + waitFor server.legacyFilterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) waitFor sleepAsync(100.millis) diff --git a/tests/v2/wakunode_rest/test_rest_filter.nim b/tests/v2/wakunode_rest/test_rest_filter.nim index c17bdccc3b..01f58ff5aa 100644 --- a/tests/v2/wakunode_rest/test_rest_filter.nim +++ b/tests/v2/wakunode_rest/test_rest_filter.nim @@ -36,33 +36,31 @@ proc testWakuNode(): WakuNode = type RestFilterTest = object - node1: WakuNode - node2: WakuNode + serviceNode: WakuNode + subscriberNode: WakuNode restServer: RestServerRef messageCache: filter_api.MessageCache client: RestClientRef proc setupRestFilter(): Future[RestFilterTest] {.async.} = - result.node1 = testWakuNode() - result.node2 = testWakuNode() + result.serviceNode = testWakuNode() + result.subscriberNode = testWakuNode() - await allFutures(result.node1.start(), result.node2.start()) + await allFutures(result.serviceNode.start(), result.subscriberNode.start()) - await result.node1.mountFilter() - await result.node2.mountFilterClient() + await result.serviceNode.mountFilter() + await result.subscriberNode.mountFilterClient() - result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + result.subscriberNode.peerManager.addServicePeer(result.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterCodec) let restPort = Port(58011) let restAddress = ValidIpAddress.init("0.0.0.0") result.restServer = RestServerRef.init(restAddress, restPort).tryGet() - result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) + result.messageCache = result.subscriberNode.getFilterV2PushMessageCache().get() - installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache) + installFilterRestApiHandlers(result.restServer.router, result.subscriberNode) result.restServer.start() @@ -74,11 +72,11 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} = proc shutdown(self: RestFilterTest) {.async.} = await self.restServer.stop() await self.restServer.closeWait() - await allFutures(self.node1.stop(), self.node2.stop()) + await allFutures(self.serviceNode.stop(), self.subscriberNode.stop()) -suite "Waku v2 Rest API - Filter": - asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": +suite "Waku v2 Rest API - Filter V2": + asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions": # Given let restFilterTest: RestFilterTest = await setupRestFilter() @@ -89,15 +87,18 @@ suite "Waku v2 Rest API - Filter": ,ContentTopic("4") ] - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, + let requestBody = FilterSubscriptionsRequest(requestId: "1234", + contentFilters: contentFilters, pubsubTopic: DefaultPubsubTopic) - let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) + + echo "response", $response # Then check: response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "1234" check: restFilterTest.messageCache.isSubscribed(DefaultContentTopic) @@ -106,19 +107,20 @@ suite "Waku v2 Rest API - Filter": restFilterTest.messageCache.isSubscribed("4") # When - error case - let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "") - let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) + let badRequestBody = FilterSubscriptionsRequest(requestId: "4567", contentFilters: @[], pubsubTopic: "") + let badResponse = await restFilterTest.client.filterPostSubscriptions(badRequestBody) check: badResponse.status == 400 - $badResponse.contentType == $MIMETYPE_TEXT - badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" + $badResponse.contentType == $MIMETYPE_JSON + badResponse.data.requestId == "4567" + badResponse.data.statusDesc == "Failed to decode request" await restFilterTest.shutdown() - asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions": # Given let restFilterTest: RestFilterTest = await setupRestFilter() @@ -136,15 +138,16 @@ suite "Waku v2 Rest API - Filter": ] # When - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, - pubsubTopic: DefaultPubsubTopic) - let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) + let requestBody = FilterUnsubscribeRequest(requestId: "4321", + contentFilters: some(contentFilters), + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterDeleteSubscriptions(requestBody) # Then check: response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "4321" check: not restFilterTest.messageCache.isSubscribed("1") @@ -153,39 +156,3 @@ suite "Waku v2 Rest API - Filter": restFilterTest.messageCache.isSubscribed("4") await restFilterTest.shutdown() - - - asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": - # Given - - let - restFilterTest = await setupRestFilter() - - let pubSubTopic = "/waku/2/default-waku/proto" - let contentTopic = ContentTopic( "content-topic-x" ) - - let messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - ] - - restFilterTest.messageCache.subscribe(contentTopic) - for msg in messages: - restFilterTest.messageCache.addMessage(contentTopic, msg) - - # When - let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_JSON - response.data.len == 3 - response.data.all do (msg: FilterWakuMessage) -> bool: - msg.payload == base64.encode("TEST-1") and - msg.contentTopic.get().string == "content-topic-x" and - msg.version.get() == 2 and - msg.timestamp.get() != Timestamp(0) - - await restFilterTest.shutdown() diff --git a/tests/v2/wakunode_rest/test_rest_legacy_filter.nim b/tests/v2/wakunode_rest/test_rest_legacy_filter.nim new file mode 100644 index 0000000000..653bbb52fe --- /dev/null +++ b/tests/v2/wakunode_rest/test_rest_legacy_filter.nim @@ -0,0 +1,188 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../waku/v2/node/message_cache, + ../../waku/common/base64, + ../../waku/v2/waku_core, + ../../waku/v2/waku_node, + ../../waku/v2/node/peer_manager, + ../../waku/v2/waku_filter, + ../../waku/v2/node/rest/server, + ../../waku/v2/node/rest/client, + ../../waku/v2/node/rest/responses, + ../../waku/v2/node/rest/filter/types, + ../../waku/v2/node/rest/filter/legacy_handlers as filter_api, + ../../waku/v2/node/rest/filter/legacy_client as filter_api_client, + ../../waku/v2/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +type RestFilterTest = object + filterNode: WakuNode + clientNode: WakuNode + restServer: RestServerRef + messageCache: filter_api.MessageCache + client: RestClientRef + + +proc setupRestFilter(): Future[RestFilterTest] {.async.} = + result.filterNode = testWakuNode() + result.clientNode = testWakuNode() + + await allFutures(result.filterNode.start(), result.clientNode.start()) + + await result.filterNode.mountFilter() + await result.clientNode.mountFilterClient() + + result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + + result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) + installLegacyFilterRestApiHandlers(result.restServer.router, result.clientNode, result.messageCache) + + result.restServer.start() + + result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + + return result + + +proc shutdown(self: RestFilterTest) {.async.} = + await self.restServer.stop() + await self.restServer.closeWait() + await allFutures(self.filterNode.stop(), self.clientNode.stop()) + + +suite "Waku v2 Rest API - Filter": + asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": + # Given + let restFilterTest: RestFilterTest = await setupRestFilter() + + # When + let contentFilters = @[DefaultContentTopic + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ] + + let requestBody = FilterV1SubscriptionsRequest(contentFilters: contentFilters, + pubsubTopic: DefaultPubsubTopic) + let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + restFilterTest.messageCache.isSubscribed(DefaultContentTopic) + restFilterTest.messageCache.isSubscribed("2") + restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + # When - error case + let badRequestBody = FilterV1SubscriptionsRequest(contentFilters: @[], pubsubTopic: "") + let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) + + check: + badResponse.status == 400 + $badResponse.contentType == $MIMETYPE_TEXT + badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" + + + await restFilterTest.shutdown() + + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + # Given + let + restFilterTest: RestFilterTest = await setupRestFilter() + + # When + restFilterTest.messageCache.subscribe("1") + restFilterTest.messageCache.subscribe("2") + restFilterTest.messageCache.subscribe("3") + restFilterTest.messageCache.subscribe("4") + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + # When + let requestBody = FilterV1SubscriptionsRequest(contentFilters: contentFilters, + pubsubTopic: DefaultPubsubTopic) + let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not restFilterTest.messageCache.isSubscribed("1") + not restFilterTest.messageCache.isSubscribed("2") + not restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + await restFilterTest.shutdown() + + + asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": + # Given + + let + restFilterTest = await setupRestFilter() + + let pubSubTopic = "/waku/2/default-waku/proto" + let contentTopic = ContentTopic( "content-topic-x" ) + + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + restFilterTest.messageCache.subscribe(contentTopic) + for msg in messages: + restFilterTest.messageCache.addMessage(contentTopic, msg) + + # When + let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: FilterWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get().string == "content-topic-x" and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + await restFilterTest.shutdown() diff --git a/waku/v2/node/jsonrpc/filter/handlers.nim b/waku/v2/node/jsonrpc/filter/handlers.nim index 900fd85fa6..416fed4ef7 100644 --- a/waku/v2/node/jsonrpc/filter/handlers.nim +++ b/waku/v2/node/jsonrpc/filter/handlers.nim @@ -47,7 +47,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = cache.addMessage(msg.contentTopic, msg) - let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) + let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) if not await subFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to subscribe to contentFilters") @@ -65,7 +65,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(pubsubTopic, contentTopics, peerOpt.get()) if not await unsubFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to unsubscribe from contentFilters") diff --git a/waku/v2/node/rest/filter/client.nim b/waku/v2/node/rest/filter/client.nim index a5b53c01f6..3a38b51afc 100644 --- a/waku/v2/node/rest/filter/client.nim +++ b/waku/v2/node/rest/filter/client.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + json, std/sets, stew/byteutils, chronicles, @@ -19,7 +20,7 @@ import export types logScope: - topics = "waku node rest client" + topics = "waku node rest client v2" proc encodeBytes*(value: FilterSubscriptionsRequest, contentType: string): RestResult[seq[byte]] = @@ -30,39 +31,68 @@ proc encodeBytes*(value: FilterSubscriptionsRequest, let encoded = ?encodeIntoJsonBytes(value) return ok(encoded) -proc decodeBytes*(t: typedesc[string], value: openarray[byte], - contentType: Opt[ContentTypeData]): RestResult[string] = - if MediaType.init($contentType) != MIMETYPE_TEXT: +proc encodeBytes*(value: FilterSubscriberPing, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType return err("Unsupported contentType") - var res: string - if len(value) > 0: - res = newString(len(value)) - copyMem(addr res[0], unsafeAddr value[0], len(value)) - return ok(res) + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} +proc encodeBytes*(value: FilterUnsubscribeRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[FilterSubscriptionResponse], + value: openarray[byte], + contentType: Opt[ContentTypeData]): + + RestResult[FilterSubscriptionResponse] = -proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], - data: openArray[byte], - contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = if MediaType.init($contentType) != MIMETYPE_JSON: - error "Unsupported response contentType value", contentType = contentType - return err("Unsupported response contentType") + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") - let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) + let decoded = ?decodeFromJsonBytes(FilterSubscriptionResponse, value) return ok(decoded) + # var res: string + # if len(value) > 0: + # res = newString(len(value)) + # copyMem(addr res[0], unsafeAddr value[0], len(value)) + # try: + # let jsonContent = parseJson(res) + # if $jsonContent["status"].getStr() != "success": + # error "query failed", result=jsonContent + # return err("query failed") + # + # return ok(FilterSubscriptionResponse( + # requestId: jsonContent["requestId"].getStr(), + # statusCode: uint32(jsonContent["statusCode"].getInt()), + # statusDesc: jsonContent["statusDesc"].getStr() + # )) + # except Exception: + # return err("Failed to get the Filter request response: " & getCurrentExceptionMsg()) + # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterGetMessagesV1*(contentTopic: string): - RestResponse[FilterGetMessagesResponse] - {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} +proc filterSubscriberPing*(requestId: string): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions{requestId}", meth: HttpMethod.MethodGet.} + +proc filterPostSubscriptions*(body: FilterSubscriptionsRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPost.} + +proc filterPutSubscriptions*(body: FilterSubscriptionsRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPut.} + +proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.} diff --git a/waku/v2/node/rest/filter/handlers.nim b/waku/v2/node/rest/filter/handlers.nim index d61ae52c7e..42decfaa56 100644 --- a/waku/v2/node/rest/filter/handlers.nim +++ b/waku/v2/node/rest/filter/handlers.nim @@ -14,24 +14,26 @@ import import ../../../waku_core, ../../../waku_filter, - ../../../waku_filter/client, + ../../../waku_filter_v2, + ../../../waku_filter_v2/client as filter_protocol_client, + ../../../waku_filter_v2/common as filter_protocol_type, ../../message_cache, ../../peer_manager, ../../waku_node, ../serdes, ../responses, ./types - + export types logScope: - topics = "waku node rest filter_api" + topics = "waku node rest filter_api_v2" -const futTimeoutForSubscriptionProcessing* = 5.seconds +const futTimeoutForSubscriptionProcessing* = 5.seconds #### Request handlers -const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" +const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions" const filterMessageCacheDefaultCapacity* = 30 @@ -50,106 +52,153 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR let requestResult = decodeFromJsonBytes(T, reqBodyData) if requestResult.isErr(): - return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & $requestResult.error)) return ok(requestResult.get()) -proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - let pushHandler: FilterPushHandler = - proc(pubsubTopic: PubsubTopic, - msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): T = + if protocolClientRes.isErr(): + FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.error().kind), + statusDesc: $protocolClientRes + ) + else: + FilterSubscriptionResponse( + requestId: requestId, + statusCode: 0, + statusDesc: "" + ) - router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - ## Subscribes a node to a list of contentTopics of a pubsubTopic - # debug "post_waku_v2_filter_v1_subscriptions" +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T = + FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.kind), + statusDesc: $protocolClientRes + ) - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) - if decodedBody.isErr(): - return decodedBody.error + var httpStatus = Http200 - let req: FilterSubscriptionsRequest = decodedBody.value() + if protocolClientRes.isErr(): + httpStatus = HttpCode(protocolClientRes.error().kind) # TODO: convert status codes! - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) - if peerOpt.isNone(): - return RestApiResponse.internalServerError("No suitable remote filter peers") + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) + + let httpStatus = HttpCode(protocolClientRes.kind) # TODO: convert status codes! + + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) - let subFut = node.filterSubscribe(req.pubsubTopic, - req.contentFilters, - pushHandler, + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc filterPostPutSubscriptionRequestHandler(node: WakuNode, contentBody: Option[ContentBody]) : Future[RestApiResponse] {.async.} = + ## handles any filter subscription requests, adds or modifies. + + let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return makeRestResponse("unknown", FilterSubscribeError.badRequest("Failed to decode request")) + + let req: FilterSubscriptionsRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let subFut = node.filterV2Subscribe(req.pubsubTopic, + req.contentFilters, peerOpt.get()) - if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): - error "Failed to subscribe to contentFilters do to timeout!" - return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Subscription request timed out")) + + return makeRestResponse(req.requestId, subFut.read()) - # Successfully subscribed to all content filters - for cTopic in req.contentFilters: - cache.subscribe(cTopic) +proc installFilterPostSubscriptionsHandler*(router: var RestRouter, + node: WakuNode) = + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody + + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody) + return response + +proc installFilterPutSubscriptionsHandler*(router: var RestRouter, + node: WakuNode) = + router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic + debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - return RestApiResponse.ok() + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody) + return response -proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: +proc installFilterDeleteSubscriptionsHandler*(router: var RestRouter, + node: WakuNode) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a PubSub topic - # debug "delete_waku_v2_filter_v1_subscriptions" + debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody) if decodedBody.isErr(): - return decodedBody.error + return makeRestResponse("unknown", FilterSubscribeError.badRequest("Failed to decode request")) - let req: FilterSubscriptionsRequest = decodedBody.value() + let req: FilterUnsubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) + if peerOpt.isNone(): + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let unsubFut = node.filterV2unsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to unsubscribe from contentFilters due to timeout!" - return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") - - for cTopic in req.contentFilters: - cache.unsubscribe(cTopic) + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Failed to unsubscribe from contentFilters due to timeout!")) # Successfully unsubscribed from all requested contentTopics - return RestApiResponse.ok() - -const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" - -proc installFilterGetMessagesV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: - ## Returns all WakuMessages received on a specified content topic since the - ## last time this method was called - ## TODO: ability to specify a return message limit - # debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic - - if contentTopic.isErr(): - return RestApiResponse.badRequest("Missing contentTopic") - - let contentTopic = contentTopic.get() - - let msgRes = cache.getMessages(contentTopic, clear=true) - if msgRes.isErr(): - return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) - - let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) - let resp = RestApiResponse.jsonResponse(data, status=Http200) - if resp.isErr(): - error "An error ocurred while building the json respose: ", error=resp.error - return RestApiResponse.internalServerError("An error ocurred while building the json respose") - - return resp.get() - -proc installFilterApiHandlers*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - installFilterPostSubscriptionsV1Handler(router, node, cache) - installFilterDeleteSubscriptionsV1Handler(router, node, cache) - installFilterGetMessagesV1Handler(router, node, cache) + return makeRestResponse(req.requestId, unsubFut.read()) + +const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}" + +proc installFilterPingSubscriberHandler*(router: var RestRouter, + node: WakuNode) = + router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse: + ## Checks if a node has valid subscription or not. + debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + return makeRestResponse(requestId.get(), FilterSubscribeError.serviceUnavailable("No suitable remote filter peers")) + + let pingFutRes = node.wakuFilterClient.ping(peerOpt.get()) + + if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return makeRestResponse(requestId.get(), FilterSubscribeError.serviceUnavailable("Ping timed out")) + + return makeRestResponse(requestId.get(), pingFutRes.read()) + +proc installFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode) = + installFilterPingSubscriberHandler(router, node) + installFilterPostSubscriptionsHandler(router, node) + installFilterPutSubscriptionsHandler(router, node) + installFilterDeleteSubscriptionsHandler(router, node) diff --git a/waku/v2/node/rest/filter/legacy_client.nim b/waku/v2/node/rest/filter/legacy_client.nim new file mode 100644 index 0000000000..5328bff982 --- /dev/null +++ b/waku/v2/node/rest/filter/legacy_client.nim @@ -0,0 +1,68 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sets, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest client v1" + +proc encodeBytes*(value: FilterV1SubscriptionsRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterPostSubscriptionsV1*(body: FilterV1SubscriptionsRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterDeleteSubscriptionsV1*(body: FilterV1SubscriptionsRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterV1GetMessagesResponse], + data: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[FilterV1GetMessagesResponse] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(FilterV1GetMessagesResponse, data) + return ok(decoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterV1GetMessagesResponse] + {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/v2/node/rest/filter/legacy_handlers.nim b/waku/v2/node/rest/filter/legacy_handlers.nim new file mode 100644 index 0000000000..caa485678f --- /dev/null +++ b/waku/v2/node/rest/filter/legacy_handlers.nim @@ -0,0 +1,160 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common +import + ../../../waku_core, + ../../../waku_filter, + ../../../waku_filter/client, + ../../message_cache, + ../../peer_manager, + ../../waku_node, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest filter_api v1" + +const futTimeoutForSubscriptionProcessing* = 5.seconds + +#### Request handlers + +const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" + +const filterMessageCacheDefaultCapacity* = 30 + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + let pushHandler: FilterPushHandler = + proc(pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterV1SubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterV1SubscriptionsRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let subFut = node.legacyFilterSubscribe(req.pubsubTopic, + req.contentFilters, + pushHandler, + peerOpt.get()) + + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) + + return RestApiResponse.ok() + +proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterV1SubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterV1SubscriptionsRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + + for cTopic in req.contentFilters: + cache.unsubscribe(cTopic) + + # Successfully unsubscribed from all requested contentTopics + return RestApiResponse.ok() + +const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" + +proc installFilterV1GetMessagesV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + ## Returns all WakuMessages received on a specified content topic since the + ## last time this method was called + ## TODO: ability to specify a return message limit + debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic=contentTopic + + if contentTopic.isErr(): + return RestApiResponse.badRequest("Missing contentTopic") + + let contentTopic = contentTopic.get() + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) + + let data = FilterV1GetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc installLegacyFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterV1PostSubscriptionsV1Handler(router, node, cache) + installFilterV1DeleteSubscriptionsV1Handler(router, node, cache) + installFilterV1GetMessagesV1Handler(router, node, cache) diff --git a/waku/v2/node/rest/filter/openapi.yaml b/waku/v2/node/rest/filter/openapi.yaml index d913eb08a5..438f0c4bfe 100644 --- a/waku/v2/node/rest/filter/openapi.yaml +++ b/waku/v2/node/rest/filter/openapi.yaml @@ -1,6 +1,6 @@ openapi: 3.0.3 info: - title: Waku V2 node REST API + title: Waku V2 node REST API version: 1.0.0 contact: name: VAC Team @@ -10,6 +10,8 @@ tags: description: Filter REST API for WakuV2 node paths: + # Legacy support for v1 waku filter + # TODO: legacy endpoint, remove in the future /filter/v1/subscriptions: post: # post_waku_v2_filter_v1_subscription summary: Subscribe a node to an array of topics @@ -21,7 +23,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterV1SubscriptionsRequest' responses: '200': description: OK @@ -32,8 +34,16 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string delete: # delete_waku_v2_filter_v1_subscription summary: Unsubscribe a node from an array of topics @@ -45,7 +55,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterV1SubscriptionsRequest' responses: '200': description: OK @@ -56,12 +66,24 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '404': description: Not found. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string - # TODO: Review the path of this endpoint due maybe query for list of contentTopics matching + # TODO: legacy endpoint, remove in the future /filter/v1/messages/{contentTopic}: get: # get_waku_v2_filter_v1_messages summary: Get the latest messages on the polled content topic @@ -82,14 +104,200 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterGetMessagesResponse' + $ref: '#/components/schemas/FilterV1GetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '404': + description: Not found. + content: + text/plain: + schema: + type: string + '5XX': + description: Unexpected error. + content: + text/plain: + schema: + type: string + + /filter/v2/subscriptions/{requestId}: + get: # get_waku_v2_filter_v2_subscription - ping + summary: Subscriber-ping - a peer can query if there is a registered subscription for it + description: | + Subscriber peer can query its subscription existence on service node. + Returns HTTP200 if exists and HTTP404 if not. + Client must not fill anything but requestId in the request body. + operationId: subscriberPing + tags: + - filter + parameters: + - in: path + name: requestId + required: true + schema: + type: string + description: Id of ping request + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + /filter/v2/subscriptions: + post: # post_waku_v2_filter_v2_subscription + summary: Subscribe a peer to an array of content topics under a pubsubTopic + description: | + Subscribe a peer to an array of content topics under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: postSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionsRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' '404': description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' '5XX': description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + put: # put_waku_v2_filter_v2_subscription + summary: Modify existing subscription of a peer under a pubsubTopic + description: | + Modify existing subscription of a peer under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: putSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionsRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from a pubsobTopic filtering by content topics + description: | + Unsubscribe a peer from a pubsobTopic filtering by content topics + + If no content topic is provided, neither content-topics provided, + peer will be unsubscirbe from all. + + If subscription detail is provided, only that subscription will be removed which matches existing. + operationId: deleteSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' components: schemas: @@ -97,7 +305,7 @@ components: type: string ContentTopic: type: string - + FilterWakuMessage: type: object properties: @@ -113,19 +321,60 @@ components: required: - payload - FilterSubscriptionsRequest: + FilterV1SubscriptionsRequest: type: object - properties: + properties: contentFilters: type: array items: $ref: '#/components/schemas/ContentTopic' pubsubTopic: $ref: "#/components/schemas/PubSubTopic" - required: + required: - contentFilters - - FilterGetMessagesResponse: + - pubsubTopic + + FilterV1GetMessagesResponse: type: array items: - $ref: '#/components/schemas/FilterWakuMessage' \ No newline at end of file + $ref: '#/components/schemas/FilterWakuMessage' + + FilterSubscriptionsRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + - pubsubTopic + + FilterUnsubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + + FilterSubscriptionResponse: + type: object + properties: + requestId: + type: string + statusDesc: + type: string + required: + - requestId diff --git a/waku/v2/node/rest/filter/types.nim b/waku/v2/node/rest/filter/types.nim index 22b2ee4daa..336b987e7d 100644 --- a/waku/v2/node/rest/filter/types.nim +++ b/waku/v2/node/rest/filter/types.nim @@ -8,7 +8,8 @@ import chronicles, json_serialization, json_serialization/std/options, - presto/[route, client, common] + presto/[route, client, common], + libp2p/peerid import ../../../../common/base64, ../../../waku_core, @@ -22,12 +23,31 @@ type FilterWakuMessage* = object version*: Option[Natural] timestamp*: Option[int64] -type FilterGetMessagesResponse* = seq[FilterWakuMessage] +type FilterV1GetMessagesResponse* = seq[FilterWakuMessage] + +type FilterV1SubscriptionsRequest* = object + # Subscription request for legacy filter support + pubsubTopic*: PubSubTopic + contentFilters*: seq[ContentTopic] + +type FilterSubscriberPing* = object + requestId*: string type FilterSubscriptionsRequest* = object + requestId*: string pubsubTopic*: PubSubTopic contentFilters*: seq[ContentTopic] +type FilterUnsubscribeRequest* = object + requestId*: string + pubsubTopic*: Option[PubSubTopic] + contentFilters*: Option[seq[ContentTopic]] + +type FilterSubscriptionResponse* = object + requestId*: string + statusCode*: uint32 + statusDesc*: string + #### Type conversion proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = @@ -65,7 +85,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) writer.writeField("timestamp", value.timestamp) writer.endRecord() -proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterV1SubscriptionsRequest) {.raises: [IOError].} = writer.beginRecord() writer.writeField("pubsubTopic", value.pubsubTopic) @@ -114,9 +134,72 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) timestamp: timestamp ) +proc readValue*(reader: var JsonReader[RestJson], value: var FilterV1SubscriptionsRequest) + {.raises: [SerializationError, IOError].} = + var + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterV1SubscriptionsRequest") + + case fieldName + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if pubsubTopic.isNone(): + reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterV1SubscriptionsRequest( + pubsubTopic: pubsubTopic.get(), + contentFilters: contentFilters.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriberPing) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterV1SubscriptionsRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriberPing( + requestId: requestId.get() + ) + proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) - {.raises: [SerializationError, IOError].} = + {.raises: [SerializationError, IOError].} = var + requestId = none(string) pubsubTopic = none(PubsubTopic) contentFilters = none(seq[ContentTopic]) @@ -126,9 +209,11 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions if keys.containsOrIncl(fieldName): let err = try: fmt"Multiple `{fieldName}` fields found" except CatchableError: "Multiple fields with the same name found" - reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") + reader.raiseUnexpectedField(err, "FilterV1SubscriptionsRequest") case fieldName + of "requestId": + requestId = some(reader.readValue(string)) of "pubsubTopic": pubsubTopic = some(reader.readValue(PubsubTopic)) of "contentFilters": @@ -136,6 +221,9 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions else: unrecognizedFieldWarning() + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + if pubsubTopic.isNone(): reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") @@ -146,6 +234,75 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions reader.raiseUnexpectedValue("Field `contentFilters` is empty") value = FilterSubscriptionsRequest( + requestId: requestId.get(), pubsubTopic: pubsubTopic.get(), contentFilters: contentFilters.get() - ) \ No newline at end of file + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterV1SubscriptionsRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterUnsubscribeRequest( + requestId: requestId.get(), + pubsubTopic: pubsubTopic, + contentFilters: contentFilters + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionResponse) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + statusCode = none(uint32) + statusDesc = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterV1SubscriptionsRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "statusCode": + statusCode = some(reader.readValue(uint32)) + of "statusDesc": + statusDesc = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriptionResponse( + requestId: requestId.get(), + statusCode: statusCode.get(), + statusDesc: statusDesc.get("") + ) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 57d0d7a848..16bd7f1788 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -31,8 +31,9 @@ import ../waku_store, ../waku_store/client as store_client, ../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed - ../waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed + ../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed ../waku_filter_v2, + ../waku_filter_v2/client as filter_client, ../waku_lightpush, ../waku_lightpush/client as lightpush_client, ../waku_enr, @@ -40,7 +41,8 @@ import ../waku_peer_exchange, ./config, ./peer_manager, - ./waku_switch + ./waku_switch, + ./rest/relay/topic_cache when defined(rln): import @@ -90,8 +92,10 @@ type wakuStore*: WakuStore wakuStoreClient*: WakuStoreClient wakuFilter*: waku_filter_v2.WakuFilter + wakuFilterClient*: filter_client.WakuFilterClient + filterV2PushMessageCache: TopicCache #Exposed by proc intentionally to reflect inner or outer pushMessageHandler used wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed - wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed + wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed when defined(rln): wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush @@ -354,22 +358,50 @@ proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: Wak await node.wakuFilter.handleMessage(pubsubTopic, message) await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy - -proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = +proc mountFilterClient*(node: WakuNode, filterV2MessagePushHandler= none(filter_client.MessagePushHandler)) {.async, raises: [Defect, LPError].} = + ## Mounting both filter clients v1 - legacy and v2. + ## Giving option for application level to chose btw own push message handling or + ## rely on node provided cache. - This only applies for v2 filter client info "mounting filter client" node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) + + if filterV2MessagePushHandler.isNone(): + # TODO: tie it to config, but from proper place (rest_legacy_filter_api.filterMessageCacheDefaultCapacity) + node.filterV2PushMessageCache = TopicCache.init(capacity=30) + + let internalPushMessageHandler: filter_client.MessagePushHandler = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + node.filterV2PushMessageCache.addMessage(pubsubTopic, message) + + node.wakuFilterClient = filter_client.WakuFilterClient.new(node.rng, internalPushMessageHandler, node.peerManager) + else: + node.wakuFilterClient = filter_client.WakuFilterClient.new(node.rng, filterV2MessagePushHandler.get(), node.peerManager) + if node.started: - # Node has started already. Let's start filter too. - await node.wakuFilterClientLegacy.start() + await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start()) + node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) -proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], - handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = +# TODO: review this if it is sufficient app interface or not. +proc getFilterV2PushMessageCache*(node: WakuNode): Option[TopicCache] = + ## To give option for the node user - app level - choses how it would like to process pushed messages + ## from filter relay hereby expose optional internal message cache. + if node.filterV2PushMessageCache.isNil(): + return none(TopicCache) + else: + return some(node.filterV2PushMessageCache) + +proc legacyFilterSubscribe*(node: WakuNode, + pubsubTopic: PubsubTopic, + contentTopics: ContentTopic|seq[ContentTopic], + handler: FilterPushHandler, + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" + error "cannot register filter subscription to topic", error="waku legacy filter client is not set up" return let remotePeerRes = parsePeerInfo(peer) @@ -383,6 +415,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app + # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times. let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = if node.wakuRelay.isNil() and not node.wakuStore.isNil(): await node.wakuArchive.handleMessage(pubSubTopic, message) @@ -391,16 +424,52 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) if subRes.isOk(): - info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + info "v1 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics else: - error "failed filter subscription", error=subRes.error + error "failed filter v1 subscription", error=subRes.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) -proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], - peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Unsubscribe from a content filter. +proc filterV2Subscribe*(node: WakuNode, + pubsubTopic: PubsubTopic, + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string): + + Future[FilterSubscribeResult] + + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): - error "cannot unregister filter subscription to content", error="waku filter client is nil" + error "cannot register filter subscription to topic", error="waku filter client is not set up" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "Couldn't parse the peer info properly", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId + + let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic, contentTopics) + if subRes.isOk(): + info "v2 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter v2 subscription", error=subRes.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + return subRes + +proc legacyFilterUnsubscribe*(node: WakuNode, + pubsubTopic: PubsubTopic, + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + ## Unsubscribe from a content legacy filter. + + if node.wakuFilterClientLegacy.isNil(): + error "cannot unregister legacy filter subscription to content", error="waku legacy filter client is nil" return let remotePeerRes = parsePeerInfo(peer) @@ -410,44 +479,60 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: let remotePeer = remotePeerRes.value - info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId + info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId let unsubRes = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) if unsubRes.isOk(): info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics else: - error "failed filter unsubscription", error=unsubRes.error + error "failed legacy filter unsubscription", error=unsubRes.error waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) -# TODO: Move to application module (e.g., wakunode2.nim) -proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" - return +proc filterV2Unsubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: Option[seq[ContentTopic]], + peer: RemotePeerInfo|string): - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + Future[FilterSubscribeResult] - await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) + {.async, gcsafe, raises: [Defect, ValueError].} = -# TODO: Move to application module (e.g., wakunode2.nim) -proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = - ## Unsubscribe from a content filter. + ## Unsubscribe from a content filter V2". if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" - return + return err(FilterSubscribeError.serviceUnavailable()) - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + # TODO: check RFC + if pubsubTopic.isSome() and contentTopics.isSome(): + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics.get()) + if unsubRes.isOk(): + info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter unsubscription", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + + else: + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer) + if unsubRes.isOk(): + info "unsubscribed from all topic", peerId=remotePeer.peerId + else: + error "failed filter unsubscription", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) + return unsubRes ## Waku archive const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes diff --git a/waku/v2/waku_core/peers.nim b/waku/v2/waku_core/peers.nim index 19520ebcb0..bb918b3681 100644 --- a/waku/v2/waku_core/peers.nim +++ b/waku/v2/waku_core/peers.nim @@ -17,8 +17,8 @@ import libp2p/multicodec, libp2p/peerid, libp2p/peerinfo, - libp2p/routing_record - + libp2p/routing_record, + json_serialization type Connectedness* = enum @@ -62,6 +62,9 @@ type RemotePeerInfo* = ref object func `$`*(remotePeerInfo: RemotePeerInfo): string = $remotePeerInfo.peerId +proc writeValue*(w: var JsonWriter, value: RemotePeerInfo) {.inline, raises: [IOError].} = + w.writeValue $value + proc init*( T: typedesc[RemotePeerInfo], peerId: PeerID, diff --git a/waku/v2/waku_filter_v2/client.nim b/waku/v2/waku_filter_v2/client.nim index 5a7d86d904..4e0fd51dff 100644 --- a/waku/v2/waku_filter_v2/client.nim +++ b/waku/v2/waku_filter_v2/client.nim @@ -34,8 +34,10 @@ func generateRequestId(rng: ref HmacDrbgContext): string = hmacDrbgGenerate(rng[], bytes) return toHex(bytes) + + proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} = - trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest + trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec) if connOpt.isNone(): diff --git a/waku/v2/waku_filter_v2/rpc.nim b/waku/v2/waku_filter_v2/rpc.nim index e6718dfa93..7807651b76 100644 --- a/waku/v2/waku_filter_v2/rpc.nim +++ b/waku/v2/waku_filter_v2/rpc.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + json_serialization, std/options import ../waku_core @@ -70,3 +71,29 @@ proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T = statusCode: 200, statusDesc: some(desc) ) + +proc `$`*(err: FilterSubscribeResponse): string = + "FilterSubscribeResponse of req:" & err.requestId & " [" & $err.statusCode & "] " & $err.statusDesc + +proc `$`*(req: FilterSubscribeRequest): string = + "FilterSubscribeRequest of req:" & req.requestId & " [" & $req.filterSubscribeType & "] " & $req.pubsubTopic + +proc `$`*(t: FilterSubscribeType): string = + result = case t: + of SUBSCRIBER_PING: "SUBSCRIBER_PING" + of SUBSCRIBE: "SUBSCRIBE" + of UNSUBSCRIBE: "UNSUBSCRIBE" + of UNSUBSCRIBE_ALL: "UNSUBSCRIBE_ALL" + +proc writeValue*(writer: var JsonWriter, value: FilterSubscribeRequest) {.inline, raises: [IOError].} = + writer.beginRecord() + writer.writeField("requestId", value.requestId) + writer.writeField("type", value.filterSubscribeType) + if value.pubsubTopic.isSome: + writer.writeField("pubsubTopic", value.pubsubTopic) + if value.contentTopics.len>0: + writer.writeField("contentTopics", value.contentTopics) + writer.endRecord() + + +