Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Send Status message on all newly-opened legacy substreams #6593

Merged
3 commits merged into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ impl<B: BlockT> BlockAnnouncesHandshake<B> {
}
}

/// Builds a SCALE-encoded "Status" message to send as handshake for the legacy protocol.
fn build_status_message<B: BlockT>(protocol_config: &ProtocolConfig, chain: &Arc<dyn Client<B>>) -> Vec<u8> {
let info = chain.info();
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash: info.genesis_hash,
roles: protocol_config.roles.into(),
best_number: info.best_number,
best_hash: info.best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

Message::<B>::Status(status).encode()
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
Expand Down Expand Up @@ -403,6 +419,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
queue_size_report
);
Expand Down Expand Up @@ -547,6 +564,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn update_chain(&mut self) {
let info = self.context_data.chain.info();
self.sync.update_chain_info(&info.best_hash, info.best_number);
self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain));
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
);
}

/// Inform sync about an own imported block.
Expand Down Expand Up @@ -683,7 +705,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
self.send_status(who);
}

/// Called by peer when it is disconnecting
Expand Down Expand Up @@ -1329,22 +1350,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Send Status message
fn send_status(&mut self, who: PeerId) {
let info = self.context_data.chain.info();
let status = message::generic::Status {
version: CURRENT_VERSION,
min_supported_version: MIN_VERSION,
genesis_hash: info.genesis_hash,
roles: self.config.roles,
best_number: info.best_number,
best_hash: info.best_hash,
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

self.send_message(&who, None, GenericMessage::Status(status))
}

fn on_block_announce(
&mut self,
who: PeerId,
Expand Down Expand Up @@ -1498,6 +1503,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
});
if let Some((best_num, best_hash)) = new_best {
self.sync.update_chain_info(&best_hash, best_num);
self.behaviour.set_legacy_handshake_message(build_status_message(&self.config, &self.context_data.chain));
self.behaviour.set_notif_protocol_handshake(
&self.block_announces_protocol,
BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
Expand Down
48 changes: 15 additions & 33 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ use libp2p::swarm::{
PollParameters
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}};
use std::{error, mem, pin::Pin, str, time::Duration};
use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration};
use wasm_timer::Instant;

/// Network behaviour that handles opening substreams for custom protocols with other peers.
Expand Down Expand Up @@ -118,7 +119,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, [u8]>, Vec<u8>)>,
notif_protocols: Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
Expand Down Expand Up @@ -220,20 +221,6 @@ enum PeerState {
}

impl PeerState {
/// True if there exists any established connection to the peer.
fn is_connected(&self) -> bool {
match self {
PeerState::Disabled { .. } |
PeerState::DisabledPendingEnable { .. } |
PeerState::Enabled { .. } |
PeerState::PendingRequest { .. } |
PeerState::Requested |
PeerState::Incoming { .. } => true,
PeerState::Poisoned |
PeerState::Banned { .. } => false,
}
}

/// True if there exists an established connection to the peer
/// that is open for custom protocol traffic.
fn is_open(&self) -> bool {
Expand Down Expand Up @@ -343,10 +330,12 @@ impl GenericProto {
local_peer_id: PeerId,
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
let legacy_protocol = RegisteredProtocol::new(protocol, versions);
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
local_peer_id,
Expand All @@ -372,7 +361,7 @@ impl GenericProto {
protocol_name: impl Into<Cow<'static, [u8]>>,
handshake_msg: impl Into<Vec<u8>>
) {
self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into()))));
}

/// Modifies the handshake of the given notifications protocol.
Expand All @@ -383,24 +372,17 @@ impl GenericProto {
protocol_name: &[u8],
handshake_message: impl Into<Vec<u8>>
) {
let handshake_message = handshake_message.into();
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
protocol.1 = handshake_message.clone();
} else {
return;
*protocol.1.write() = handshake_message.into();
}
}

// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
protocol_name: Cow::Owned(protocol_name.to_owned()),
handshake_message: handshake_message.clone(),
},
});
}
/// Modifies the handshake of the legacy protocol.
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}

/// Returns the number of discovered nodes that we keep in memory.
Expand Down
54 changes: 20 additions & 34 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, error};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}};

/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
Expand All @@ -77,10 +78,10 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
pub struct NotifsHandlerProto {
/// Prototypes for handlers for inbound substreams, and the message we respond with in the
/// handshake.
in_handlers: Vec<(NotifsInHandlerProto, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandlerProto, Arc<RwLock<Vec<u8>>>)>,

/// Prototypes for handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandlerProto, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandlerProto, Arc<RwLock<Vec<u8>>>)>,

/// Prototype for handler for backwards-compatibility.
legacy: LegacyProtoHandlerProto,
Expand All @@ -91,10 +92,10 @@ pub struct NotifsHandlerProto {
/// See the documentation at the module level for more information.
pub struct NotifsHandler {
/// Handlers for inbound substreams, and the message we respond with in the handshake.
in_handlers: Vec<(NotifsInHandler, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandler, Arc<RwLock<Vec<u8>>>)>,

/// Handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandler, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,

/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,
Expand Down Expand Up @@ -161,18 +162,6 @@ pub enum NotifsHandlerIn {
message: Vec<u8>,
},

/// Modifies the handshake message of a notifications protocol.
UpdateHandshake {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols.
protocol_name: Cow<'static, [u8]>,

/// The new handshake message to send if we open a substream or if the remote opens a
/// substream towards us.
handshake_message: Vec<u8>,
},

/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
Expand Down Expand Up @@ -253,7 +242,7 @@ impl NotifsHandlerProto {
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
queue_size_report: Option<HistogramVec>
) -> Self {
let list = list.into();
Expand Down Expand Up @@ -346,12 +335,17 @@ impl ProtocolsHandler for NotifsHandler {
self.enabled = EnabledState::Enabled;
self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
for (handler, initial_message) in &mut self.out_handlers {
// We create `initial_message` on a separate line to be sure that the lock
// is released as soon as possible.
let initial_message = initial_message.read().clone();
handler.inject_event(NotifsOutHandlerIn::Enable {
initial_message: initial_message.clone(),
initial_message,
});
}
for num in self.pending_in.drain(..) {
let handshake_message = self.in_handlers[num].1.clone();
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = self.in_handlers[num].1.read().clone();
self.in_handlers[num].0
.inject_event(NotifsInHandlerIn::Accept(handshake_message));
}
Expand All @@ -375,18 +369,6 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => {
for (handler, current_handshake) in &mut self.in_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
for (handler, current_handshake) in &mut self.out_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
}
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
Expand Down Expand Up @@ -524,8 +506,12 @@ impl ProtocolsHandler for NotifsHandler {
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled =>
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
Expand Down
10 changes: 9 additions & 1 deletion client/network/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});

let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], peerset, None),
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None),
addrs: addrs
.iter()
.enumerate()
Expand Down Expand Up @@ -241,6 +241,8 @@ fn two_nodes_transfer_lots_of_packets() {
);
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
Expand All @@ -251,6 +253,8 @@ fn two_nodes_transfer_lots_of_packets() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
match Message::<Block>::decode(&mut &message[..]).unwrap() {
Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
Expand Down Expand Up @@ -312,6 +316,8 @@ fn basic_two_nodes_requests_in_parallel() {
service1.send_packet(&peer_id, msg.encode());
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
Expand All @@ -321,6 +327,8 @@ fn basic_two_nodes_requests_in_parallel() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
to_receive.remove(pos);
Expand Down
Loading