Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add NetworkService::send_notifications #6591

Closed
wants to merge 11 commits into from
101 changes: 98 additions & 3 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,16 +541,24 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
&self.local_peer_id
}

/// Writes a message on an open notifications channel. Has no effect if the notifications
/// channel with this protocol name is closed.
/// Appends a notification to the buffer of pending outgoing notifications with the given peer.
/// Has no effect if the notifications channel with this protocol name is not open.
///
/// If the buffer of pending outgoing notifications with that peer is full, the notification
/// is silently dropped. This happens if you call this method at a higher rate than the rate
/// at which the peer processes these notifications, or if the available network bandwidth is
/// too low.
/// For this reason, this method is considered soft-deprecated. You are encouraged to use
/// [`NetworkService::send_notification`] instead.
///
/// > **Note**: The reason why this is a no-op in the situation where we have no channel is
/// > that we don't guarantee message delivery anyway. Networking issues can cause
/// > connections to drop at any time, and higher-level logic shouldn't differentiate
/// > between the remote voluntarily closing a substream or a network error
/// > preventing the message from being delivered.
///
/// The protocol must have been registered with `register_notifications_protocol`.
/// The protocol must have been registered with `register_notifications_protocol` or
/// `NetworkConfiguration::notifications_protocols`.
///
pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification {
Expand All @@ -560,6 +568,68 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
});
}

/// Waits until one or more slots are available in the buffer of pending outgoing notifications
/// with the given peer.
///
/// The returned `Future` finishes after the peer is ready to accept more notifications, or
/// after the substream has been closed. Use the returned [`NotificationsBufferSlots`] to
/// actually send the notifications.
///
/// An error is returned if there exists no open notifications substream with that combination
/// of peer and protocol, or if the remote has asked to close the notifications substream.
///
/// If the remote requests to close the notifications substream, all notifications successfully
/// enqueued with this method will finish being sent out before the substream actually gets
/// closed, but attempting to enqueue more notifications will now return an error. It is also
/// possible for the entire connection to be abruptly closed, in which case enqueued
/// notifications will be lost.
///
/// The protocol must have been registered with `register_notifications_protocol` or
/// `NetworkConfiguration::notifications_protocols`.
///
/// # Usage
///
/// This method waits until there is space available in the buffer of messages towards the
/// given peer. If the peer processes notifications at a slower rate than we send them, this
/// buffer will quickly fill up.
///
/// As such, you should never do something like this:
///
/// ```ignore
/// // Do NOT do this
/// for peer in peers {
/// network.send_notifications(peer, ..., notifications).await;
Copy link
Contributor

@rphmeier rphmeier Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that I'm not supposed to do this, but it's also kind of hard to manage potentially complex sending logic across many subsystems.

@infinity0 suggested that it might be best for each subsystem to get a different substream for each peer to manage that.

In Polkadot we have this "Network Bridge" which I can probably refactor away, but it was nice for serving as an additional layer of abstraction between the "raw" network service and the protocols we are using.

I mentioned in this comment that I'd like to use something like a wrapper around FuturesUnordered for this, so instead we'd be waiting on network.send_notifications for all peers at once. That is OK?

Copy link
Contributor

@rphmeier rphmeier Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure what the expected control-flow is in this all-or-nothing style.

First I have to look at the actions available to us.

So if a peer has no space in their buffer for a message, I can take two immediate actions:

  • Remove a message from the buffer
  • Disconnect the peer

I might also like to defer until the peer has some space in their buffer and then attempt to send. While this is happening I cannot process incoming messages because the outcome of handling some incoming message might lead to us attempting yet another send to the peer, which I'm not sure how to handle. Because it's not clear how to handle multiple sends where there is no buffer, I prefer just to make this an impossibility.

Waiting for the peer to have space in their buffer should obviously have a timeout after which we just disconnect the peer because they are slowing everything down. But it seems like things actually slow down more with the all-or-nothing approach - if we only need n spaces in the buffer, we shouldn't have to wait for the whole buffer to be sent before buffering the next message.

Zooming out a bit, it's also unclear how you would write the analogue of this code:

for peer in peers {
    peer.send(msg()).await
}

The best thing I can come up with is this:

  1. Go through all peers, attempting to push the message onto the buffer of each one.
  2. For each one without space in the buffer, attempt to clean up outdated messages.
  3. For each one still without space, push a task onto a FuturesUnordered that waits for space in the buffer for some time and pushes onto the buffer when space is available.
  4. Drive this FuturesUnordered alongside send_notifications for all peers until the FuturesUnordered has completed all futures either by having successfully written into freed space or timing out.
  5. For each timeout coming from the FuturesUnordered attempt to clean up outdated messages.
  6. If there is space in the buffer, push the message and continue. If not, disconnect the peer.

This seems complicated but ok-ish. However as mentioned above we can't reasonably process protocol messages while waiting on space in any peer's buffer. So I'm not sure why any new messages would ever become outdated between steps 2 and 5 - maybe it's better to just disconnect as soon as the timeout is reached?

This is assuming that there aren't any protocol messages that can safely be dropped. If there are messages we can drop, then we would just drop the peer (and reduce the reputation?) instead of disconnecting outright.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not meant to be used by high-level code, it is the foundation on top of which we can implement something like sc-network-gossip, or Ximin's priority queue system, or something similar.

The flow control is meant to be: you push your messages to a queue (which isn't handled by sc-network), and this queue is, in parallel, processed by another task that sends out the messages to the various peers.
If a peer is too slow, then it is the role of this other task to stop sending to it and disconnect it.

Zooming out a bit, it's also unclear how you would write the analogue of this code:

You'd do some_channel.send(msg()).await;, and the receiving side of the channel then attempts, in parallel, to send the message to all peers.

I'm not 100% sure what the expected control-flow is in this all-or-nothing style.

The way you are supposed to use this API is really one message at a time.
The all-or-nothing system when passing multiple messages is something I'm not sure is actually useful.

Copy link
Contributor

@rphmeier rphmeier Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you are supposed to use this API is really one message at a time.
The all-or-nothing system when passing multiple messages is something I'm not sure is actually useful.

If you mean one message at a time per-peer, then yes, I agree with you.

But if you mean one message at a time overall, then I am not sure if that is realistic.

However note that my example above is only about sending one message at a time per-peer. The all-or-nothing is about how the buffer drains, not anything else. So waiting for 1 space is just as expensive as waiting for all space in the buffer to be free.

Copy link
Contributor

@rphmeier rphmeier Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd do some_channel.send(msg()).await;, and the receiving side of the channel then attempts, in parallel, to send the message to all peers.

OK, this is fine, and actually suits the code we wrote for Polkadot networking very well already. However as soon as you introduce this layer of abstraction where the thing that actually calls send_notifications is one step removed from the thing which generates the messages, you lose the ability to meaningfully inspect and prune the buffer without callbacks.

So I'd assumed that since we actually do want to do this message-pruning thing, we'd want to call send_notifications at the high-level.

Copy link
Contributor Author

@tomaka tomaka Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However as soon as you introduce this layer of abstraction where the thing that actually calls send_notifications is one step removed from the thing which generates the messages, you lose the ability to meaningfully inspect and prune the buffer without callbacks.

This remains to be designed, but I believe it should be possible to use a futures::lock::Mutex to access the content of the queue.

One thing that we might have to change is that send_notifications could return an intermediate object, so that we don't have to immediately pass the notification by value, and we can lock the Mutex only once the peer is ready to accept a message. I'm going to adjust the PR.

Copy link
Contributor Author

@tomaka tomaka Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very rough draft, but I have something like this in mind: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=427a0f40cd2d3e9bf9c3f5a228111045

Pushing a message to the queue would lock the queue, which gives us the freedom to remove elements from that queue. The background task would also lock that queue but only for a very brief moment in order to pop a message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will say that it needs a lot of work before it can be used in our gossip protocols, which are pseudo-gossip and rely on data-dependency - it's not a simple case of "every peer gets every message".

/// }
/// ```
///
/// Doing so would slow down all peers to the rate of the slowest one. A malicious or
/// malfunctioning peer could intentionally process notifications at a very slow rate.
///
/// Instead, you are encouraged to maintain your own buffer of notifications on top of the one
/// maintained by `sc-network`, and use `send_notifications` to progressively send out
/// elements from your buffer. If this additional buffer is full (which will happen at some
/// point if the peer is too slow to process notifications), appropriate measures can be taken,
/// such as removing non-critical notifications from the buffer or disconnecting the peer
/// using [`NetworkService::disconnect_peer`].
///
///
/// Notifications Per-peer buffer
/// broadcast +-------> of notifications +--> `send_notifications` +--> Internet
/// ^ (not covered by
/// | sc-network)
/// +
/// Notifications should be dropped
/// if buffer is full
///
pub async fn send_notifications(
Copy link
Contributor

@infinity0 infinity0 Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to mirror my bounded-priority-queue suggestion? If so I am confused by the API description.

The diagram above would seem to suggest that this function is for taking things out of the per-peer buffer, and then sending them to the peer via whatever stream we're using. In other words, it corresponds to pop_message as I described in this comment.

However a few paragraphs above you say "This method waits until there space is available in the buffer of messages towards", which is not what pop_message is supposed to do, it is supposed to wait until items are available i.e. the buffer is non-empty - otherwise there is nothing to pop. The opposing method push_message (which in your diagram would be called by the notifications broadcast) is never supposed to block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is meant to be the foundation on top of which your bounded-priority-queue could be implemented.
The way they plug together is that you'd do something like while let Some(next) = pop_message() { send_notifications(..., next) }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. It might be good to clarify "there is space available in the lower-level buffer of messages towards the peer" and add this to the diagram in between send_notifications and Internet.

&self,
target: PeerId,
engine_id: ConsensusEngineId,
num_slots: usize,
) -> Result<NotificationsBufferSlots, SendNotificationsError> {
todo!()
}

/// Returns a stream containing the events that happen on the network.
///
/// If this method is called multiple times, the events are duplicated.
Expand Down Expand Up @@ -812,6 +882,31 @@ impl<B, H> NetworkStateInfo for NetworkService<B, H>
}
}

/// Reserved slots in the notifications buffer, ready to accept data.
#[must_use]
pub struct NotificationsBufferSlots<'a> {
_dummy: std::marker::PhantomData<'a>,
}

impl<'a> NotificationsBufferSlots<'a> {
/// Consumes this slots reservation and actually queues the notifications.
///
/// # Panic
///
/// Panics if the number of items in the `notifications` iterator is different from the number
/// of reserved slots.
pub send(self, notifications: impl Iterator<Item = impl Into<Vec<u8>>>) {
todo!()
}
}

/// Error returned by [`NetworkService::send_notification`].
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum SendNotificationsError {
/// No open notifications substream exists with the provided combination of peer and protocol.
NoSubstream,
}

/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
Expand Down