-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: new filter protocol increment - subscribe request handling #1584
Conversation
Can we feature flag this, similar to Lines 98 to 104 in 43b3f1c
|
This is currently not integrated anywhere or part of any CI, while incrementing towards the feature. The idea is that this implementation will completely replace the existing filter implementation within the next week or two, so there may be little value in having a separate build or flag (the existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see some details but overall makes sense to me. Feel free to take my comments as part of the upcoming increments.
if peerId in wf.subscriptions: | ||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) | ||
if peerSubscription.len() + filterCriteria.len() >= MaxCriteriaPerSubscription: | ||
return err(FilterSubscribeError( | ||
kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE, | ||
cause: "peer has reached maximum number of filter criteria" | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a tricky scenario: Shall we accept partially a subscription that "overflows" the maximum number of subscriptions?
In my opinion, we shouldn't. But here, despite erroring out, we are inserting in the subscriptions map. To avoid that situation, I would do the following:
- After checking the
contentTopics.len == 0
, I would add another check comparing the upper bound of the content topics length against a limit (e.g. MAX 20 subscriptions per query). - Before L56 (constructing the filter criteria set), I would calculate the remaining "subscription slots" from the
wf.subscriptions
(a proc that returns this would be great) and check if theseq[ContentTopic]
length is larger than the remaining slots. In that case, I would error out.
Once the request is sanitized, then I would do the insertion without worrying of running out of subscription slots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, we shouldn't.
Agreed! I also considered this scenario and think a request should either be fully honoured or fail (partially honouring a request is a nightmare in error handling from both client and service node's perspective)
here, despite erroring out, we are inserting in the subscriptions map
Wait, I don't see it or am missing something obvious. 😅 We return in L63 without doing the insert in L68 and L69. Good idea to return earlier though.
e.g. MAX 20 subscriptions per query
Good point. Will also clarify this as a recommendation so that we get reasonable filter requests from the beginning.
I would calculate the remaining "subscription slots
As mentioned above, sounds reasonable to me too to return earlier here, although I don't quite see that we're currently exceeding max criteria per peer subscription. Will address in a follow-up (the subscriptions map is likely to be split to a separate module soon in any case, as I can imagine we may want to make it a bit more complex to e.g. build a reverse index, add timestamps, a sweeper, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, I don't see it or am missing something obvious. We return in L63 without doing the insert in L68 and L69. Good idea to return earlier though.
My bad, I misread this part. I thought that mgetOrPut
was modifying already the table's value. True. You bail out before setting the value back to the table.
if wf.subscriptions.len() >= MaxSubscriptions: | ||
return err(FilterSubscribeError( | ||
kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE, | ||
cause: "node has reached maximum number of subscriptions" | ||
)) | ||
debug "creating new subscription", peerId=peerId | ||
wf.subscriptions[peerId] = filterCriteria |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following my previous comment, the problem is that we permit surpassing the MaxSubscriptions
limit. But we don't do anything to prevent or fix the situation.
For security reasons, it is preferable to apply a prevention approach (like the one I described before) than a post-fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree re early checking (e.g. proc that quickly check if a new request can be honoured given current subscriptions/criteria-per-subscription capacity). Not sure though I see how we
permit surpassing the MaxSubscriptions limit
if we return immediately once we reach the ==
limit?
I will add the early check in any case in a follow-up though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we return immediately once we reach the == limit?
I will add the early check in any case in a follow-up though.
I believe that doing a precondition check is the safest approach. So you avoid some necessary operations (and you save some CPU cycles).
|
||
ok() | ||
|
||
proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is less harmful than the subscription, I would still sanitize the "unsubscribe" request (e.g., limiting the length of the topic string, limiting the number of content topics, for example, to a maximum of 20, etc.).
I we accept a big list of topics, 90% of the time, the customers will use the maximum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will add to follow-up!
Co-authored-by: Lorenzo Delgado <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! some minor comments/ideas.
|
||
ok() | ||
|
||
proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why pubsubTopic
is optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an optional in the request RPC since it's only mandatory when following the SUBSCRIBE workflow. I made the choice to do all request consistency checking within the utility function belonging to that request. In this case it is mandatory to have a pubsubTopic
, so subscribe
returns a bad request error if optional is none.
MaxCriteriaPerSubscription = 1000 | ||
|
||
type | ||
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an idea:
- make
FilterCriterion
a table pubsubtopic->contenttopic - easier to reason perhaps since contenttopic is a "subtopic" of gossipsub topic
- less likely that we store duplicates since someone subscribing to ctopic_1, ctopic_2 in gtopic_1 will require: (gtopic_1, ctopic_1), (gtopic_1, ctopic_2).
- faster access time perhaps?
- but the most important, easier to access subscriptions
- one type less
# renamed FilterCriterion to PeerSubscriptions
type
PeerSubscriptions* = Table[PubsubTopic, HashSet[ContentTopic]]
and then WakuFilter
WakuFilter* = ref object of LPProtocol
subscriptions*: Table[PeerID, PeerSubscriptions]
peerManager: PeerManager
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm. This is indeed how we previously modelled it - a map of peer IDs to a map of pubsub topics to content topics - but I made the (perhaps not final) decision to flatten this as much as possible. Reasoning: the subscription is actually against a contentTopic
, so in this model the "real" subscription would be nested two levels deep - we just (mandatorily) require each contentTopic
to be qualified against a specific pubsubTopic
. Explicitly modelling the pubsub topic separately requires a bit more maintenance. In that case it would be invalid, for example, to subscribe/unsubscribe with filter criteria that contains only a pubsubTopic
or for a subscription where a peer's Peer Subscription has an empty content topic set against any pubsub topic. We can build in rules around this, of course, but importantly we want to move away from a model where pubsubTopic
is necessary as a qualifier at all.
I am planning possible changes to this model, though, as we may need something of a reverse map/index when looking up peers for a specific subscription. In this case the reverse index may be modelled as you suggested. Will still think about this, as you make a good point.
|
||
let response = wf.handleSubscribeRequest(conn.peerId, request) | ||
|
||
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not very likely to happen in reality, but if the connection is closed by the counterparty right before, this will throw an exception. I just verified empirically when modifying px.
https://github.com/waku-org/nwaku/blob/master/tests/v2/test_waku_peer_exchange.nim#L247
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks will address!
|
||
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria | ||
|
||
if peerId in wf.subscriptions: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
related to the other comment, using two maps this would be something like this. like the wf.subscriptions[peerId][pubsubTopic]
to address the ctopic.
if wf.subscriptions.hasKey(peerId):
if wf.subscriptions[peerId].hasKey(pubsubTopic):
for cTopic in contentTopics:
wf.subscriptions[peerId][pubsubTopic].add(cTopic)
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. But to e.g. count number of total subscriptions and enforce limit you'd need to count the values of the second map - i.e. sum of content topics for all pubsub topics (easily doable of course, but slightly more roundabout when modelled).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Background
This is a first increment towards the server-side implementation of the filter protocol changes proposed in vacp2p/rfc#562 with wire protocol in waku-org/waku-proto#16.
It only covers basic subscriber request handling.
Note that this is still an incomplete feature and isolated in code. Once feature-complete, the old
filter
implementation will be removed and replaced by this one (with simultaneous merging of the RFC). Test cases are also still mainly focused on happy-path cases, to keep this increment as simple as possible.The major outstanding features are:
Dogfooding can likely start without a client implementation in nwaku, as nwaku is most often configured as a service node only.
Next steps
Further steps, roughly in order: