diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 6cf55097b0..46a9f329c3 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -52,6 +52,7 @@ import ../../waku/node/rest/filter/handlers as rest_filter_api, ../../waku/node/rest/store/handlers as rest_store_api, ../../waku/node/rest/health/handlers as rest_health_api, + ../../waku/node/rest/lightpush/handlers as rest_lightpush_api, ../../waku/node/jsonrpc/admin/handlers as rpc_admin_api, ../../waku/node/jsonrpc/debug/handlers as rpc_debug_api, ../../waku/node/jsonrpc/filter/handlers as rpc_filter_api, @@ -590,6 +591,8 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Store REST API installStoreApiHandlers(server.router, app.node) + installLightPushRequestHandler(server.router, app.node) + server.start() info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 6e6cb60370..f5846a0c56 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -97,7 +97,8 @@ import ./wakunode_rest/test_rest_serdes, ./wakunode_rest/test_rest_store, ./wakunode_rest/test_rest_filter, - ./wakunode_rest/test_rest_legacy_filter + ./wakunode_rest/test_rest_legacy_filter, + ./wakunode_rest/test_rest_lightpush import ./waku_rln_relay/test_waku_rln_relay, diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim new file mode 100644 index 0000000000..fbd3e0f6c2 --- /dev/null +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -0,0 +1,194 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto + +import + ../../waku/node/message_cache, + ../../waku/common/base64, + ../../waku/waku_core, + ../../waku/waku_node, + ../../waku/node/peer_manager, + ../../waku/waku_lightpush, + ../../waku/node/rest/server, + ../../waku/node/rest/client, + ../../waku/node/rest/responses, + ../../waku/node/rest/lightpush/types, + ../../waku/node/rest/lightpush/handlers as lightpush_api, + ../../waku/node/rest/lightpush/client as lightpush_api_client, + ../../waku/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +type RestLightPushTest = object + serviceNode: WakuNode + pushNode: WakuNode + consumerNode: WakuNode + restServer: RestServerRef + client: RestClientRef + + +proc init(T: type RestLightPushTest): Future[T] {.async.} = + var testSetup = RestLightPushTest() + testSetup.serviceNode = testWakuNode() + testSetup.pushNode = testWakuNode() + testSetup.consumerNode = testWakuNode() + + await allFutures(testSetup.serviceNode.start(), + testSetup.pushNode.start(), + testSetup.consumerNode.start()) + + await testSetup.consumerNode.mountRelay() + await testSetup.serviceNode.mountRelay() + await testSetup.serviceNode.mountLightPush() + testSetup.pushNode.mountLightPushClient() + + + testSetup.serviceNode.peerManager.addServicePeer( + testSetup.consumerNode.peerInfo.toRemotePeerInfo(), + WakuRelayCodec) + + await testSetup.serviceNode.connectToNodes(@[testSetup.consumerNode.peerInfo.toRemotePeerInfo()]) + + testSetup.pushNode.peerManager.addServicePeer( + testSetup.serviceNode.peerInfo.toRemotePeerInfo(), + WakuLightPushCodec) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("127.0.0.1") + testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet() + + installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode) + + testSetup.restServer.start() + + testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort)) + + return testSetup + + +proc shutdown(self: RestLightPushTest) {.async.} = + await self.restServer.stop() + await self.restServer.closeWait() + await allFutures(self.serviceNode.stop(), self.pushNode.stop()) + + +suite "Waku v2 Rest API - lightpush": + asyncTest "Push message request": + # Given + let restLightPushTest = await RestLightPushTest.init() + + restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic) + restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) + require: + toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 + + # When + let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, + payload = toBytes("TEST-1")).toRelayWakuMessage() + + let requestBody = PushRequest(pubsubTopic: some(DefaultPubsubTopic), + message: message) + let response = await restLightPushTest.client.sendPushRequest(requestBody) + + echo "response", $response + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + + await restLightPushTest.shutdown() + + asyncTest "Push message bad-request": + # Given + let restLightPushTest = await RestLightPushTest.init() + + restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) + require: + toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 + + # When + let badMessage1 : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, + payload = toBytes("")).toRelayWakuMessage() + let badRequestBody1 = PushRequest(pubsubTopic: some(DefaultPubsubTopic), + message: badMessage1) + + let badMessage2 : RelayWakuMessage = fakeWakuMessage(contentTopic = "", + payload = toBytes("Sthg")).toRelayWakuMessage() + let badRequestBody2 = PushRequest(pubsubTopic: some(DefaultPubsubTopic), + message: badMessage2) + + let badRequestBody3 = PushRequest(pubsubTopic: none(PubsubTopic), + message: badMessage2) + + var response: RestResponse[string] + + response = await restLightPushTest.client.sendPushRequest(badRequestBody1) + + echo "response", $response + + # Then + check: + response.status == 400 + $response.contentType == $MIMETYPE_TEXT + response.data.startsWith("Invalid content body") + + + # when + response = await restLightPushTest.client.sendPushRequest(badRequestBody2) + + # Then + check: + response.status == 400 + $response.contentType == $MIMETYPE_TEXT + response.data.startsWith("Invalid content body") + + # when + response = await restLightPushTest.client.sendPushRequest(badRequestBody3) + + # Then + check: + response.status == 400 + $response.contentType == $MIMETYPE_TEXT + response.data.startsWith("Invalid content body") + + await restLightPushTest.shutdown() + + asyncTest "Push message request service not available": + # Given + let restLightPushTest = await RestLightPushTest.init() + + # When + let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic, + payload = toBytes("TEST-1")).toRelayWakuMessage() + + let requestBody = PushRequest(pubsubTopic: some("NoExistTopic"), + message: message) + let response = await restLightPushTest.client.sendPushRequest(requestBody) + + echo "response", $response + + # Then + check: + response.status == 503 + $response.contentType == $MIMETYPE_TEXT + response.data == "Failed to request a message push: Can not publish to any peers" + + await restLightPushTest.shutdown() diff --git a/tests/wakunode_rest/test_rest_relay_serdes.nim b/tests/wakunode_rest/test_rest_relay_serdes.nim index 2a3789370d..19aacfd56b 100644 --- a/tests/wakunode_rest/test_rest_relay_serdes.nim +++ b/tests/wakunode_rest/test_rest_relay_serdes.nim @@ -19,7 +19,7 @@ suite "Waku v2 Rest API - Relay - serialization": test "optional fields are not provided": # Given let payload = base64.encode("MESSAGE") - let jsonBytes = toBytes("{\"payload\":\"" & $payload & "\"}") + let jsonBytes = toBytes("{\"payload\":\"" & $payload & "\",\"contentTopic\":\"some/topic\"}") # When let res = decodeFromJsonBytes(RelayWakuMessage, jsonBytes, requireAllFields = true) @@ -29,7 +29,8 @@ suite "Waku v2 Rest API - Relay - serialization": let value = res.get() check: value.payload == payload - value.contentTopic.isNone() + value.contentTopic.isSome() + value.contentTopic.get() == "some/topic" value.version.isNone() value.timestamp.isNone() diff --git a/waku/node/rest/lightpush/client.nim b/waku/node/rest/lightpush/client.nim new file mode 100644 index 0000000000..96caa818ee --- /dev/null +++ b/waku/node/rest/lightpush/client.nim @@ -0,0 +1,49 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + json, + std/sets, + stew/byteutils, + strformat, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest client v2" + +proc encodeBytes*(value: PushRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +proc sendPushRequest*(body: PushRequest): + RestResponse[string] + {.rest, endpoint: "/lightpush/v1/message", meth: HttpMethod.MethodPost.} diff --git a/waku/node/rest/lightpush/handlers.nim b/waku/node/rest/lightpush/handlers.nim new file mode 100644 index 0000000000..a13fe6fd4c --- /dev/null +++ b/waku/node/rest/lightpush/handlers.nim @@ -0,0 +1,86 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/strformat, + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common + +import + ../../../waku_core, + ../../peer_manager, + ../../waku_node, + ../../waku/waku_lightpush, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest lightpush api" + +const futTimeoutForPushRequestProcessing* = 5.seconds + +#### Request handlers + +const ROUTE_LIGHTPUSH* = "/lightpush/v1/message" + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc installLightPushRequestHandler*(router: var RestRouter, + node: WakuNode) = + + router.api(MethodPost, ROUTE_LIGHTPUSH) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Send a request to push a waku message + debug "post", ROUTE_LIGHTPUSH, contentBody + + let decodedBody = decodeRequestBody[PushRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error() + + let req: PushRequest = decodedBody.value() + let msg = req.message.toWakuMessage() + + if msg.isErr(): + return RestApiResponse.badRequest("Invalid message: {msg.error}") + + let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) + if peerOpt.isNone(): + return RestApiResponse.serviceUnavailable("No suitable remote lightpush peers") + + let subFut = node.lightpushPublish(req.pubsubTopic, + msg.value(), + peerOpt.get()) + + if not await subFut.withTimeout(futTimeoutForPushRequestProcessing): + error "Failed to request a message push due to timeout!" + return RestApiResponse.serviceUnavailable("Push request timed out") + + if subFut.value().isErr(): + return RestApiResponse.serviceUnavailable(fmt("Failed to request a message push: {subFut.value().error}")) + + return RestApiResponse.ok() diff --git a/waku/node/rest/lightpush/openapi.yaml b/waku/node/rest/lightpush/openapi.yaml new file mode 100644 index 0000000000..2327936dd3 --- /dev/null +++ b/waku/node/rest/lightpush/openapi.yaml @@ -0,0 +1,84 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: lightpush + description: Lightpush REST API for WakuV2 node + +paths: + /lightpush/v1/message: + post: + summary: Request a message relay from a LightPush service provider + description: Push a message to be relayed on a PubSub topic. + operationId: postMessagesToPubsubTopic + tags: + - lightpush + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PushRequest' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '500': + description: Internal server error + content: + text/plain: + schema: + type: string + '503': + description: Service not available + content: + text/plain: + schema: + type: string + +components: + schemas: + PubsubTopic: + type: string + + ContentTopic: + type: string + + WakuMessage: + type: object + properties: + payload: + type: string + format: byte + contentTopic: + $ref: '#/components/schemas/ContentTopic' + version: + type: number + timestamp: + type: number + required: + - payload + - contentTopic + + PushRequest: + type: object + properties: + pusbsubTopic: + $ref: '#/components/schemas/PubsubTopic' + message: + $ref: '#/components/schemas/WakuMessage' + required: + - message diff --git a/waku/node/rest/lightpush/types.nim b/waku/node/rest/lightpush/types.nim new file mode 100644 index 0000000000..d3711e8033 --- /dev/null +++ b/waku/node/rest/lightpush/types.nim @@ -0,0 +1,65 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sets, strformat], + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] + +import + ../../../common/base64, + ../../../waku_core, + ../relay/types as relay_types, + ../serdes + +export relay_types + +#### Types + +type PushRequest* = object + pubsubTopic*: Option[PubSubTopic] + message*: RelayWakuMessage + +#### Serialization and deserialization + +proc writeValue*(writer: var JsonWriter[RestJson], value: PushRequest) + {.raises: [IOError].} = + writer.beginRecord() + if value.pubsubTopic.isSome(): + writer.writeField("pubsubTopic", value.pubsubTopic.get()) + writer.writeField("message", value.message) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], value: var PushRequest) + {.raises: [SerializationError, IOError].} = + var + pubsubTopic = none(PubsubTopic) + message = none(RelayWakuMessage) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "PushRequest") + + case fieldName + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "message": + message = some(reader.readValue(RelayWakuMessage)) + else: + unrecognizedFieldWarning() + + if message.isNone(): + reader.raiseUnexpectedValue("Field `message` is missing") + + value = PushRequest( + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + message: message.get() + ) diff --git a/waku/node/rest/relay/types.nim b/waku/node/rest/relay/types.nim index bf7ebce909..1d3c84bcfc 100644 --- a/waku/node/rest/relay/types.nim +++ b/waku/node/rest/relay/types.nim @@ -103,8 +103,11 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) else: unrecognizedFieldWarning() - if payload.isNone(): - reader.raiseUnexpectedValue("Field `payload` is missing") + if payload.isNone() or isEmptyOrWhitespace(string(payload.get())): + reader.raiseUnexpectedValue("Field `payload` is missing or empty") + + if contentTopic.isNone() or contentTopic.get().isEmptyOrWhitespace(): + reader.raiseUnexpectedValue("Field `contentTopic` is missing or empty") value = RelayWakuMessage( payload: payload.get(), diff --git a/waku/node/rest/responses.nim b/waku/node/rest/responses.nim index 95b58b387c..f5b244feac 100644 --- a/waku/node/rest/responses.nim +++ b/waku/node/rest/responses.nim @@ -24,6 +24,11 @@ proc internalServerError*(t: typedesc[RestApiResponse], RestApiResponse = RestApiResponse.error(Http500, msg, $MIMETYPE_TEXT) +proc serviceUnavailable*(t: typedesc[RestApiResponse], + msg: string = ""): + RestApiResponse = + RestApiResponse.error(Http503, msg, $MIMETYPE_TEXT) + proc badRequest*(t: typedesc[RestApiResponse], msg: string = ""): RestApiResponse = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2be13113eb..363581e73d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -404,7 +404,7 @@ proc legacyFilterSubscribe*(node: WakuNode, # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times. - let handlerWrapper: FilterPushHandler = + let handlerWrapper: FilterPushHandler = if node.wakuRelay.isNil() and not node.wakuStore.isNil(): proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = await allFutures(node.wakuArchive.handleMessage(pubSubTopic, message), @@ -832,7 +832,11 @@ proc mountLightPush*(node: WakuNode) {.async.} = return err("no waku relay found") else: pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = - discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) + let publishedCount = await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) + + if publishedCount == 0: + return err("Can not publish to any peers") + return ok() debug "mounting lightpush with relay"