Skip to content

Commit

Permalink
Separate node's filterUnsubscrube and unsubscrube all, various fixes …
Browse files Browse the repository at this point in the history
…upon review
  • Loading branch information
NagyZoltanPeter committed Sep 14, 2023
1 parent e9b875c commit 8791875
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 75 deletions.
23 changes: 10 additions & 13 deletions tests/wakunode_rest/test_rest_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
testSetup.subscriberNode.peerManager.addServicePeer(testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec)

let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restAddress = ValidIpAddress.init("127.0.0.1")
testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet()

let restPort2 = Port(58012)
Expand Down Expand Up @@ -138,7 +138,8 @@ suite "Waku v2 Rest API - Filter V2":
badRequestResp.status == 400
$badRequestResp.contentType == $MIMETYPE_JSON
badRequestResp.data.requestId == "unknown"
badRequestResp.data.statusDesc == "BAD_REQUEST: Failed to decode request"
# badRequestResp.data.statusDesc == "*********"
badRequestResp.data.statusDesc.startsWith("BAD_REQUEST: Failed to decode request")

await restFilterTest.shutdown()

Expand All @@ -165,7 +166,7 @@ suite "Waku v2 Rest API - Filter V2":
]

let requestBodyUnsub = FilterUnsubscribeRequest(requestId: "4321",
contentFilters: some(contentFilters),
contentFilters: contentFilters,
pubsubTopic: some(DefaultPubsubTopic))
let response = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsub)

Expand All @@ -187,10 +188,8 @@ suite "Waku v2 Rest API - Filter V2":
subPeerId in subscribedPeer4

# When - error case
let requestBodyUnsubAll = FilterUnsubscribeRequest(requestId: "2143",
contentFilters: none(seq[ContentTopic]),
pubsubTopic: none(string))
let responseUnsubAll = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsubAll)
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "2143")
let responseUnsubAll = await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)

let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4")

Expand All @@ -204,7 +203,7 @@ suite "Waku v2 Rest API - Filter V2":

asyncTest "ping subscribed node - GET /filter/v2/subscriptions/{requestId}":
# Given
let
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId

Expand All @@ -224,10 +223,8 @@ suite "Waku v2 Rest API - Filter V2":
pingResponse.data.statusDesc.len() == 0

# When - error case
let requestBodyUnsubAll = FilterUnsubscribeRequest(requestId: "9988",
contentFilters: none(seq[ContentTopic]),
pubsubTopic: none(string))
discard await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsubAll)
let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "9988")
discard await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll)

let pingResponseFail = await restFilterTest.client.filterSubscriberPing("9977")

Expand All @@ -242,7 +239,7 @@ suite "Waku v2 Rest API - Filter V2":

asyncTest "push filtered message":
# Given
let
let
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId

Expand Down
21 changes: 17 additions & 4 deletions waku/node/rest/filter/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ proc encodeBytes*(value: FilterUnsubscribeRequest,
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)

proc encodeBytes*(value: FilterUnsubscribeAllRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")

let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)

proc decodeBytes*(t: typedesc[FilterSubscriptionResponse],
value: openarray[byte],
contentType: Opt[ContentTypeData]):
Expand Down Expand Up @@ -79,8 +88,12 @@ proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.}

proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
proc filterDeleteAllSubscriptions*(body: FilterUnsubscribeAllRequest):
RestResponse[FilterSubscriptionResponse]
{.rest, endpoint: "/filter/v2/subscriptions/all", meth: HttpMethod.MethodDelete.}

proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
Expand All @@ -89,6 +102,6 @@ proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)

proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v2/messages/{contentTopic}", meth: HttpMethod.MethodGet.}
79 changes: 56 additions & 23 deletions waku/node/rest/filter/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const futTimeoutForSubscriptionProcessing* = 5.seconds

const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions"

const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all"

const filterMessageCacheDefaultCapacity* = 30

type
Expand All @@ -59,7 +61,7 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR
return ok(requestResult.get())

proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string =
## Retrieve proper error cause of FilterSubscribeError - due strigify make some parts of text double
## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double

case err.kind:
of FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
Expand All @@ -74,26 +76,26 @@ proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, prot
## Properly convert filter protocol's response to rest response

if protocolClientRes.isErr():
FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.error().kind),
statusDesc: getErrorCause(protocolClientRes.error())
)
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.error().kind),
statusDesc: getErrorCause(protocolClientRes.error())
)
else:
FilterSubscriptionResponse(
requestId: requestId,
statusCode: 0,
statusDesc: ""
)
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: 0,
statusDesc: ""
)

proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T =
## Properly convert filter protocol's response to rest response in case of error

FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.kind),
statusDesc: $protocolClientRes
)
return FilterSubscriptionResponse(
requestId: requestId,
statusCode: uint32(protocolClientRes.kind),
statusDesc: $protocolClientRes
)

proc convertErrorKindToHttpStatus(kind: filter_protocol_type.FilterSubscribeErrorKind): HttpCode =
## Filter protocol's error code is not directly convertible to HttpCodes hence this converter
Expand Down Expand Up @@ -153,7 +155,7 @@ proc filterPostPutSubscriptionRequestHandler(node: WakuNode,
let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody)

if decodedBody.isErr():
return makeRestResponse("unknown", FilterSubscribeError.badRequest("Failed to decode request"))
return makeRestResponse("unknown", FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))

let req: FilterSubscribeRequest = decodedBody.value()

Expand Down Expand Up @@ -224,12 +226,42 @@ proc installFilterDeleteSubscriptionsHandler(router: var RestRouter,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from contentFilters due to timeout!"))

if req.pubsubTopic.isNone and req.contentFilters.isNone:
cache.unsubscribeAll()
else:
# Successfully subscribed to all content filters
for cTopic in req.contentFilters.get(@[]):
cache.unsubscribe(cTopic)
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)

# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())

proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody

let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody)

if decodedBody.isErr():
return makeRestResponse("unknown",
FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}")))

let req: FilterUnsubscribeAllRequest = decodedBody.value()

let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec)

if peerOpt.isNone():
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable("No suitable peers"))

let unsubFut = node.filterUnsubscribeAll(peerOpt.get())
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return makeRestResponse(req.requestId,
FilterSubscribeError.serviceUnavailable(
"Failed to unsubscribe from all contentFilters due to timeout!"))

cache.unsubscribeAll()

# Successfully unsubscribed from all requested contentTopics
return makeRestResponse(req.requestId, unsubFut.read())
Expand Down Expand Up @@ -300,4 +332,5 @@ proc installFilterRestApiHandlers*(router: var RestRouter,
installFilterPostSubscriptionsHandler(router, node, cache)
installFilterPutSubscriptionsHandler(router, node, cache)
installFilterDeleteSubscriptionsHandler(router, node, cache)
installFilterDeleteAllSubscriptionsHandler(router, node, cache)
installFilterGetMessagesHandler(router, node, cache)
53 changes: 48 additions & 5 deletions waku/node/rest/filter/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,7 @@ paths:
summary: Unsubscribe a peer from content topics
description: |
Unsubscribe a peer from content topics
If no content topic is provided, neither content-topics provided,
peer will be unsubscirbe from all.
If subscription detail is provided, only that subscription will be removed which matches existing.
Only that subscription will be removed which matches existing.
operationId: deleteSubscriptions
tags:
- filter
Expand Down Expand Up @@ -298,6 +294,44 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions/all:
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from all content topics
description: |
Unsubscribe a peer from all content topics
operationId: deleteAllSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeAllRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/messages/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled content topic
Expand Down Expand Up @@ -407,6 +441,15 @@ components:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters

FilterUnsubscribeAllRequest:
type: object
properties:
requestId:
type: string
required:
- requestId

FilterSubscriptionResponse:
type: object
Expand Down
Loading

0 comments on commit 8791875

Please sign in to comment.