Skip to content

Commit

Permalink
feat: further filter improvements (#1617)
Browse files Browse the repository at this point in the history
  • Loading branch information
jm-clius committed Mar 22, 2023
1 parent ac56e1d commit d920b97
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 20 deletions.
6 changes: 3 additions & 3 deletions tests/v2/waku_filter_v2/test_waku_filter_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ suite "Waku Filter - subscription maintenance":
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode == 200

# Then
check:
Expand Down
64 changes: 47 additions & 17 deletions waku/v2/protocol/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import
logScope:
topics = "waku filter"

const
MaxContentTopicsPerRequest = 30

type
WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
Expand All @@ -41,6 +44,9 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))

if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))

let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))

trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
Expand All @@ -64,6 +70,9 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))

if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))

let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))

trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
Expand Down Expand Up @@ -100,15 +109,24 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs

var subscribeResult: FilterSubscribeResult

case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
let requestStartTime = Moment.now()

block:
## Handle subscribe request
case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)

let
requestDuration = Moment.now() - requestStartTime
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType])

if subscribeResult.isErr():
return FilterSubscribeResponse(
Expand Down Expand Up @@ -161,19 +179,31 @@ proc maintainSubscriptions*(wf: WakuFilter) =

wf.subscriptions.removePeers(peersToRemove)

const MessagePushTimeout = 20.seconds
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
trace "handling message", pubsubTopic=pubsubTopic, message=message

let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return
let handleMessageStartTime = Moment.now()

block:
## Find subscribers and push message to them
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return

let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)

let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])

await wf.pushToPeers(subscribedPeers, messagePush)
let
handleMessageDuration = Moment.now() - handleMessageStartTime
handleMessageDurationSec = handleMessageDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_handle_message_duration_seconds.observe(handleMessageDurationSec)

proc initProtocolHandler(wf: WakuFilter) =

Expand Down
3 changes: 3 additions & 0 deletions waku/v2/protocol/waku_filter_v2/protocol_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ export metrics

declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"]
declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"]
declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers"

# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
requestIdMismatch* = "request_id_mismatch"
errorResponse* = "error_response"
pushTimeoutFailure* = "push_timeout_failure"

0 comments on commit d920b97

Please sign in to comment.