Skip to content

Commit

Permalink
#1851 Added Rest API interface for filter service. NOTICE: WIP, tests…
Browse files Browse the repository at this point in the history
… for it still not functional due peer connection issue.
  • Loading branch information
NagyZoltanPeter authored and NagyZoltanPeter committed Jul 28, 2023
1 parent 08f3bba commit 323798d
Show file tree
Hide file tree
Showing 7 changed files with 592 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ node_modules/
# Ignore Jetbrains IDE files
.idea/

# ignore vscode files
.vscode/

# RLN / keystore
rlnKeystore.json
*.tar.gz
Expand Down
6 changes: 6 additions & 0 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
../../waku/v2/node/rest/relay/topic_cache,
../../waku/v2/node/rest/filter/handlers as rest_filter_api,
../../waku/v2/node/rest/store/handlers as rest_store_api,
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
Expand Down Expand Up @@ -548,6 +549,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)

## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=30)
installFilterApiHandlers(server.router, app.node, filterCache)

## Store REST API
installStoreApiHandlers(server.router, app.node)

Expand Down
205 changes: 205 additions & 0 deletions tests/v2/wakunode_rest/test_rest_filter.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
{.used.}

import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/v2/node/message_cache,
../../waku/common/base64,
../../waku/v2/waku_core,
../../waku/v2/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/waku_filter,
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/client,
../../waku/v2/node/rest/responses,
../../waku/v2/node/rest/filter/types,
../../waku/v2/node/rest/filter/handlers as filter_api,
../../waku/v2/node/rest/filter/client as filter_api_client,
../../waku/v2/waku_relay,
../testlib/wakucore,
../testlib/wakunode


proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)

newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))


suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()

let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()

let messageCache = filter_api.MessageCache.init(capacity=30)
installFilterPostSubscriptionsV1Handler(restServer.router, node, messageCache)

restServer.start()

await node.mountFilter()
await node.mountFilterClient()

# let remotePeerInfo = PeerInfo.init(node.peerId, node.multiaddrs)
# node.peerManager.addServicePeer(remotePeerInfo, WakuStoreCodec)


let key = generateEcdsaKey()
var peerSwitch = newStandardSwitch(some(key))
await peerSwitch.start()

peerSwitch.mount(node.wakuFilter)

let client = newRestHttpClient(initTAddress(restAddress, restPort))

let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
let fullAddr = $remotePeerInfo.addrs[0] &
"/p2p/" & $remotePeerInfo.peerId


node.peerManager.addServicePeer(remotePeerInfo,
WakuFilterCodec)

let contentFilters = @[DefaultContentTopic
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
]

# When

let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, pubsubTopic: DefaultPubsubTopic)
let response = await client.filterPostSubscriptionsV1(requestBody)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"

check:
messageCache.isSubscribed(DefaultContentTopic)
messageCache.isSubscribed("2")
messageCache.isSubscribed("3")
messageCache.isSubscribed("4")

#check:
# TODO check wakuFilter subscriptions as count.
# concening that filter cache handles subscriptions by pubsubTopic+seq[contentTopic] conjunction.

await restServer.stop()
await restServer.closeWait()
await node.stop()

# asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# # Given
# let node = testWakuNode()
# await node.start()
# await node.mountRelay()

# let restPort = Port(58012)
# let restAddress = ValidIpAddress.init("0.0.0.0")
# let restServer = RestServerRef.init(restAddress, restPort).tryGet()

# let messageCache = filter_api.messageCache.init()
# messageCache.subscribe("1")
# messageCache.subscribe("2")
# messageCache.subscribe("3")
# messageCache.subscribe("4")

# installFilterDeleteSubscriptionsV1Handler(restServer.router, node, messageCache)
# restServer.start()

# let contentFilters = @[ContentTopic("1")
# ,ContentTopic("2")
# ,ContentTopic("3")
# ,ContentTopic("4")
# ]

# # When
# let client = newRestHttpClient(initTAddress(restAddress, restPort))
# let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, pubsubTopic: DefaultPubsubTopic)
# let response = await client.filterDeleteSubscriptionsV1(requestBody)

# # Then
# check:
# response.status == 200
# $response.contentType == $MIMETYPE_TEXT
# response.data == "OK"

# check:
# not messageCache.isSubscribed("1")
# not messageCache.isSubscribed("2")
# not messageCache.isSubscribed("3")
# messageCache.isSubscribed("4")

# await restServer.stop()
# await restServer.closeWait()
# await node.stop()


asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()

let restPort = Port(58013)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()

let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )

let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]

let messageCache = filter_api.MessageCache.init(capacity = 30)

messageCache.subscribe(contentTopic)
for msg in messages:
messageCache.addMessage(contentTopic, msg)

installFilterGetMessagesV1Handler(restServer.router, node, messageCache)
restServer.start()

# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.filterGetMessagesV1(contentTopic)

# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)


check:
messageCache.isSubscribed(contentTopic)
messageCache.getMessages(contentTopic).tryGet().len == 0

await restServer.stop()
await restServer.closeWait()
await node.stop()
65 changes: 65 additions & 0 deletions waku/v2/node/rest/filter/client.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types

export types


logScope:
topics = "waku node rest client"


proc encodeBytes*(value: FilterSubscriptionsRequest,
contentType: string): RestResult[seq[byte]] =

if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)

proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =

if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}


proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
return err("Unsupported response contentType")

let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)

# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string): RestResponse[FilterGetMessagesResponse] {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}
Loading

0 comments on commit 323798d

Please sign in to comment.