-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Add NetworkService::send_notifications #6591
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API proposal looks good to me. Thanks for the detailed doc comments! 🙏
client/network/src/service.rs
Outdated
/// ```ignore | ||
/// // Do NOT do this | ||
/// for peer in peers { | ||
/// network.send_notifications(peer, ..., notifications).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get that I'm not supposed to do this, but it's also kind of hard to manage potentially complex sending logic across many subsystems.
@infinity0 suggested that it might be best for each subsystem to get a different substream for each peer to manage that.
In Polkadot we have this "Network Bridge" which I can probably refactor away, but it was nice for serving as an additional layer of abstraction between the "raw" network service and the protocols we are using.
I mentioned in this comment that I'd like to use something like a wrapper around FuturesUnordered for this, so instead we'd be waiting on network.send_notifications
for all peers at once. That is OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure what the expected control-flow is in this all-or-nothing style.
First I have to look at the actions available to us.
So if a peer has no space in their buffer for a message, I can take two immediate actions:
- Remove a message from the buffer
- Disconnect the peer
I might also like to defer until the peer has some space in their buffer and then attempt to send. While this is happening I cannot process incoming messages because the outcome of handling some incoming message might lead to us attempting yet another send to the peer, which I'm not sure how to handle. Because it's not clear how to handle multiple sends where there is no buffer, I prefer just to make this an impossibility.
Waiting for the peer to have space in their buffer should obviously have a timeout after which we just disconnect the peer because they are slowing everything down. But it seems like things actually slow down more with the all-or-nothing approach - if we only need n
spaces in the buffer, we shouldn't have to wait for the whole buffer to be sent before buffering the next message.
Zooming out a bit, it's also unclear how you would write the analogue of this code:
for peer in peers {
peer.send(msg()).await
}
The best thing I can come up with is this:
- Go through all peers, attempting to push the message onto the buffer of each one.
- For each one without space in the buffer, attempt to clean up outdated messages.
- For each one still without space, push a task onto a
FuturesUnordered
that waits for space in the buffer for some time and pushes onto the buffer when space is available. - Drive this
FuturesUnordered
alongsidesend_notifications
for all peers until theFuturesUnordered
has completed all futures either by having successfully written into freed space or timing out. - For each timeout coming from the
FuturesUnordered
attempt to clean up outdated messages. - If there is space in the buffer, push the message and continue. If not, disconnect the peer.
This seems complicated but ok-ish. However as mentioned above we can't reasonably process protocol messages while waiting on space in any peer's buffer. So I'm not sure why any new messages would ever become outdated between steps 2 and 5 - maybe it's better to just disconnect as soon as the timeout is reached?
This is assuming that there aren't any protocol messages that can safely be dropped. If there are messages we can drop, then we would just drop the peer (and reduce the reputation?) instead of disconnecting outright.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not meant to be used by high-level code, it is the foundation on top of which we can implement something like sc-network-gossip
, or Ximin's priority queue system, or something similar.
The flow control is meant to be: you push your messages to a queue (which isn't handled by sc-network
), and this queue is, in parallel, processed by another task that sends out the messages to the various peers.
If a peer is too slow, then it is the role of this other task to stop sending to it and disconnect it.
Zooming out a bit, it's also unclear how you would write the analogue of this code:
You'd do some_channel.send(msg()).await;
, and the receiving side of the channel then attempts, in parallel, to send the message to all peers.
I'm not 100% sure what the expected control-flow is in this all-or-nothing style.
The way you are supposed to use this API is really one message at a time.
The all-or-nothing system when passing multiple messages is something I'm not sure is actually useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way you are supposed to use this API is really one message at a time.
The all-or-nothing system when passing multiple messages is something I'm not sure is actually useful.
If you mean one message at a time per-peer, then yes, I agree with you.
But if you mean one message at a time overall, then I am not sure if that is realistic.
However note that my example above is only about sending one message at a time per-peer. The all-or-nothing is about how the buffer drains, not anything else. So waiting for 1 space is just as expensive as waiting for all space in the buffer to be free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd do some_channel.send(msg()).await;, and the receiving side of the channel then attempts, in parallel, to send the message to all peers.
OK, this is fine, and actually suits the code we wrote for Polkadot networking very well already. However as soon as you introduce this layer of abstraction where the thing that actually calls send_notifications
is one step removed from the thing which generates the messages, you lose the ability to meaningfully inspect and prune the buffer without callbacks.
So I'd assumed that since we actually do want to do this message-pruning thing, we'd want to call send_notifications
at the high-level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However as soon as you introduce this layer of abstraction where the thing that actually calls send_notifications is one step removed from the thing which generates the messages, you lose the ability to meaningfully inspect and prune the buffer without callbacks.
This remains to be designed, but I believe it should be possible to use a futures::lock::Mutex
to access the content of the queue.
One thing that we might have to change is that send_notifications
could return an intermediate object, so that we don't have to immediately pass the notification by value, and we can lock the Mutex
only once the peer is ready to accept a message. I'm going to adjust the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very rough draft, but I have something like this in mind: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=427a0f40cd2d3e9bf9c3f5a228111045
Pushing a message to the queue would lock the queue, which gives us the freedom to remove elements from that queue. The background task would also lock that queue but only for a very brief moment in order to pop a message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will say that it needs a lot of work before it can be used in our gossip protocols, which are pseudo-gossip and rely on data-dependency - it's not a simple case of "every peer gets every message".
client/network/src/service.rs
Outdated
/// Notifications should be dropped | ||
/// if buffer is full | ||
/// | ||
pub async fn send_notifications( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to mirror my bounded-priority-queue suggestion? If so I am confused by the API description.
The diagram above would seem to suggest that this function is for taking things out of the per-peer buffer, and then sending them to the peer via whatever stream we're using. In other words, it corresponds to pop_message
as I described in this comment.
However a few paragraphs above you say "This method waits until there space is available in the buffer of messages towards", which is not what pop_message
is supposed to do, it is supposed to wait until items are available i.e. the buffer is non-empty - otherwise there is nothing to pop. The opposing method push_message
(which in your diagram would be called by the notifications broadcast) is never supposed to block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is meant to be the foundation on top of which your bounded-priority-queue could be implemented.
The way they plug together is that you'd do something like while let Some(next) = pop_message() { send_notifications(..., next) }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. It might be good to clarify "there is space available in the lower-level buffer of messages towards the peer" and add this to the diagram in between send_notifications
and Internet
.
Looks like #5938 is merged now. Once implementation is in & reviewed, are there any other blockers? |
The implementation is rather non-trivial, but I'm working on it. |
Closing in favour of #6692 |
Adds a
NetworkService::send_notifications
method.It is unimplemented, but I'm opening this PR so that we agree on the API that it should have.
It should be possible to build systems such as dropping low-priority notifications upon this method.
Rather than writing an extensive PR description, I invite you to read the doc-comment.
A few additional comments not mentioned in the code:
send_notifications
and the operating system would be quite small (8 elements?) compared to the one used at the moment bywrite_notification
. As mentioned in the doc-comment, I think it's a good idea to provide a way to guarantee that all notifications have been sent out, and for this reason don't want to have a too big buffer.cc #5481 #6402
cc @infinity0 (GitHub doesn't let me add you as reviewer)