Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): use signed peer records on handle prune #3579

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/signed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use unsigned_varint::encode::usize_buffer;
/// A signed envelope contains an arbitrary byte string payload, a signature of the payload, and the public key that can be used to verify the signature.
///
/// For more details see libp2p RFC0002: <https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md>
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SignedEnvelope {
key: PublicKey,
payload_type: Vec<u8>,
Expand Down
62 changes: 35 additions & 27 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use log::{debug, error, trace, warn};
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};

use libp2p_core::{multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr};
use libp2p_core::{
multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr, PeerRecord,
};
use libp2p_identity::Keypair;
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand Down Expand Up @@ -1075,7 +1077,12 @@ where
|p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
)
.into_iter()
.map(|p| PeerInfo { peer_id: Some(p) })
.map(|p| PeerInfo {
peer_id: Some(p),
// TODO: Gossipsub must have access to a signed_peer_record store to serve them
// For context, see https://github.com/libp2p/rust-libp2p/issues/2398
signed_peer_record: None,
})
.collect()
} else {
Vec::new()
Expand Down Expand Up @@ -1583,8 +1590,8 @@ where
self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);

if self.mesh.contains_key(&topic_hash) {
//connect to px peers
if !px.is_empty() {
// connect to px peers
if self.config.do_px() && !px.is_empty() {
// we ignore PX from peers with insufficient score
if below_threshold {
debug!(
Expand All @@ -1595,45 +1602,46 @@ where
continue;
}

// NOTE: We cannot dial any peers from PX currently as we typically will not
// know their multiaddr. Until SignedRecords are spec'd this
// remains a stub. By default `config.prune_peers()` is set to zero and
// this is skipped. If the user modifies this, this will only be able to
// dial already known peers (from an external discovery mechanism for
// example).
if self.config.prune_peers() > 0 {
self.px_connect(px);
// mesh count already discounted prune's sender
// eagerly connect only if there's capacity in the mesh
let max_px_to_connect = self.config.mesh_n_high()
- self.mesh.get(&topic_hash).map(|x| x.len()).unwrap_or(0);
if max_px_to_connect > 0 {
self.px_connect(px, max_px_to_connect);
}
}
}
}
debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
}

fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
let n = self.config.prune_peers();
// Ignore peerInfo with no ID
//
//TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
// signed peer record?
px.retain(|p| p.peer_id.is_some());
if px.len() > n {
// only use at most prune_peers many random peers
fn px_connect(&mut self, mut px: Vec<PeerInfo>, max_px_to_connect: usize) {
if px.len() > max_px_to_connect {
let mut rng = thread_rng();
px.partial_shuffle(&mut rng, n);
px = px.into_iter().take(n).collect();
px.partial_shuffle(&mut rng, max_px_to_connect);
px = px.into_iter().take(max_px_to_connect).collect();
}

for p in px {
dapplion marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Once signed records are spec'd: extract signed peer record if given and handle
// it, see https://github.com/libp2p/specs/pull/217
if let Some(peer_id) = p.peer_id {
let dial_opts = if let Some(signed_peer_record) = p.signed_peer_record {
// If signed envelop is invalid, return None and ignore this and next peer exchanges
match PeerRecord::from_signed_envelope(signed_peer_record) {
Ok(peer_record) => {
Some((peer_record.peer_id(), peer_record.addresses().to_vec()))
}
Err(_) => return,
}
} else {
p.peer_id.map(|peer_id| (peer_id, vec![]))
};

if let Some((peer_id, addresses)) = dial_opts {
// mark as px peer
self.px_peers.insert(peer_id);

// dial peer
self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).build(),
opts: DialOpts::peer_id(peer_id).addresses(addresses).build(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this should have been done earlier, probably on line 1617.

I think we probably don't want to try to connect to all the peers, we probably only want to try and get up to mesh_n_high. I'd suggest we set n on line 1617 to be the min of self.config.prune_peers() or self.mesh.get(&topic_hash).map(|x| x.len()).unwrap_or(0) or something. Which may mean calculating this in handle_prune() because we don't have the topic hash here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with the needed change in general.

Note that prune_peers according to our docs configure the number of peers we send to others, not the number of peers we connect to. Either code or docs need to be adjusted. But using the current docs, this should probably not be used here (or in handle_prune)

It's also not clear to me if handling the prune is the best place to connect to those peers. We already graft peers in the heartbeat when below mesh_n_low. We might want to connect to these peers to accelerate the process, or we might want to prioritize already connected peers since they are somewhat more trusted. In any case, both mesh_n_high and mesh_n_low should probably be used to select the number of peers from this set we want to try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PeerExchange is meant to give you more peers in case you are only connected to a few, right?

Perhaps it would be better to just add those to some kind of list and let the grafting logic upon heartbeats do the connecting?

For the case where we don't have many peers, that would otherwise "fail" in that we can't connect to as many peers as we'd like.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are good points, and path forward is opinionated. Maintainers and @divagant-martian @AgeManning could you comment on what makes more sense?

IMO connecting eagerly instead of accumulating keep it simpler + limit only up to mesh needs. For reference Go impl eagerly schedules all peer exchanges to the connect channel (with max cap MaxPendingConnections)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not touched gossipsub much so I'll leave it to @mxinden and @AgeManning to make a decision here.

For reference Go impl eagerly schedules all peer exchanges to the connect channel (with max cap MaxPendingConnections)

That is an interesting data point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of unblocking this PR we can go with eager connection, since as Age mentioned, this needs to be explicitly turned on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AgeManning Should do_px be used to not consume px by default? Or add a new config param like use_px to split provision and consumption of px

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a more explicit config. I think we should opt for enable_peer_exchange() which enables both sending and receiving. I dont think users really need granularity over one or the other (we can add it later if its requested).

I'd suggest renaming do_px to enable_peer_exchange. Have it off my default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dapplion to achieve this you technically need to change nothing. This is already the case. Age's suggestion is a nice to have, but would change the public api, so the version should be updated accordingly. I'm fine with doing or not doing this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in a non-breaking way by adding enable_peer_exchange on Config and deprecating the do_px function. Internally, the field can be renamed without issues.

});
}
}
Expand Down
78 changes: 72 additions & 6 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,10 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
.filter_map(|info| {
info.peer_id
.and_then(|id| PeerId::from_bytes(&id).ok())
.map(|peer_id|
//TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
PeerInfo {
peer_id: Some(peer_id),
})
.map(|peer_id| PeerInfo {
peer_id: Some(peer_id),
signed_peer_record: None,
})
})
.collect::<Vec<PeerInfo>>();

Expand Down Expand Up @@ -1813,6 +1812,7 @@ fn test_connect_to_px_peers_on_handle_prune() {
for _ in 0..config.prune_peers() + 5 {
px.push(PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
});
}

Expand Down Expand Up @@ -1851,6 +1851,65 @@ fn test_connect_to_px_peers_on_handle_prune() {
));
}

#[test]
fn test_connect_to_px_peers_with_peer_record_on_handle_prune() {
let config: Config = Config::default();

let (mut gs, peers, topics) = inject_nodes1()
.peer_no(1)
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// handle prune from single peer with px peers

let mut px_peer_ids = Vec::new();
let mut px = Vec::new();
// propose more px peers than config.mesh_n_high()
for i in 0..config.mesh_n_high() + 5 {
let key = Keypair::generate_ed25519();
let address: Multiaddr = format!("/ip4/1.2.3.4/tcp/{i}").try_into().unwrap();
let peer_record = PeerRecord::new(&key, vec![address]).unwrap();

px_peer_ids.push(key.public().to_peer_id());
px.push(PeerInfo {
peer_id: None,
signed_peer_record: Some(peer_record.into_signed_envelope()),
});
}

gs.handle_prune(
&peers[0],
vec![(
topics[0].clone(),
px.clone(),
Some(config.prune_backoff().as_secs()),
)],
);

// Check DialPeer events for px peers
let dials: Vec<_> = gs
.events
.iter()
.filter_map(|e| match e {
// TODO: How to extract addresses from DialOpts to assert them in the test?
ToSwarm::Dial { opts } => opts.get_peer_id(),
_ => None,
})
.collect();

// Exactly config.prune_peers() many random peers should be dialled
assert_eq!(dials.len(), config.prune_peers());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely to fail?


let dials_set: HashSet<_> = dials.into_iter().collect();

// No duplicates
assert_eq!(dials_set.len(), config.prune_peers());

// all dial peers must be in px
assert!(dials_set.is_subset(&px_peer_ids.iter().copied().collect::<HashSet<_>>()));
Comment on lines +1906 to +1910
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to assert the presence of the addresses too!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion on how to retrieve the addresses from DialOpts struct?

// TODO: How to extract addresses from DialOpts to assert them in the test?
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(),

Or should I make this fn public?

pub(crate) fn get_addresses(&self) -> Vec<Multiaddr> {
self.addresses.clone()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really a fan of the style of the tests in gossipsub anyway. We should only be testing behaviours by running them in a Swarm and not call these lifecycle functions manually.

We now have a new crate, libp2p-swarm-test which makes it easy to set up swarms for testing.

How about we do the following:

  • Implement "sending" of signed peer records by keeping a hashmap of records in the behaviour where we perform a lookup upon prune
  • At the moment, we don't have a way of adding records to this hashmap so in reality, there won't ever be any
  • Within the test, we can add a record (by accessing the field directly)
  • We can then write a test using libp2p-swarm-test that ends up sending a Prune to a node which will then contain the SignedPeerRecord
  • We can then assert that the Swarm establishes a connection to the node specified in the prune message

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going this way seems reasonable to me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes tho if all tests in this package are written with a consistent style, should this PR stick to that style? And then refactor them into using swarm-test in another PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes tho if all tests in this package are written with a consistent style, should this PR stick to that style? And then refactor them into using swarm-test in another PR

Eventually yes but that is a massive effort. I don't mind mixing styles in a transition period if we get a better, more feature-oriented test in exchange.

}

#[test]
fn test_send_px_and_backoff_in_prune() {
let config: Config = Config::default();
Expand Down Expand Up @@ -2473,6 +2532,7 @@ fn test_ignore_px_from_negative_scored_peer() {
//handle prune from single peer with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];

gs.handle_prune(
Expand Down Expand Up @@ -3040,7 +3100,11 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

#[test]
fn test_ignore_px_from_peers_below_accept_px_threshold() {
let config = ConfigBuilder::default().prune_peers(16).build().unwrap();
let config = ConfigBuilder::default()
.prune_peers(16)
.do_px()
.build()
.unwrap();
let peer_score_params = PeerScoreParams::default();
let peer_score_thresholds = PeerScoreThresholds {
accept_px_threshold: peer_score_params.app_specific_weight,
Expand All @@ -3064,6 +3128,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
// Handle prune from peer peers[0] with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];
gs.handle_prune(
&peers[0],
Expand All @@ -3086,6 +3151,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
//handle prune from peer peers[1] with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];
gs.handle_prune(
&peers[1],
Expand Down
26 changes: 18 additions & 8 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
use futures::future;
use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, ProtocolName, SignedEnvelope, UpgradeInfo};
use libp2p_identity::{PeerId, PublicKey};
use log::{debug, warn};
use quick_protobuf::Writer;
Expand Down Expand Up @@ -508,14 +508,24 @@ impl Decoder for GossipsubCodec {
.peers
.into_iter()
.filter_map(|info| {
info.peer_id
let peer_id = info
.peer_id
.as_ref()
.and_then(|id| PeerId::from_bytes(id).ok())
.map(|peer_id|
//TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
PeerInfo {
peer_id: Some(peer_id),
})
.and_then(|peer_id| PeerId::from_bytes(peer_id).ok());

let signed_peer_record = info
.signed_peer_record
.as_ref()
.and_then(|spr| SignedEnvelope::from_protobuf_encoding(spr).ok());

if peer_id.is_none() && signed_peer_record.is_none() {
None
} else {
Some(PeerInfo {
peer_id,
signed_peer_record,
})
}
})
.collect::<Vec<PeerInfo>>();

Expand Down
10 changes: 5 additions & 5 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

//! A collection of types using the Gossipsub system.
use crate::TopicHash;
use libp2p_core::SignedEnvelope;
use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
Expand Down Expand Up @@ -200,9 +201,7 @@ pub enum SubscriptionAction {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PeerInfo {
pub peer_id: Option<PeerId>,
//TODO add this when RFC: Signed Address Records got added to the spec (see pull request
// https://github.com/libp2p/specs/pull/217)
//pub signed_peer_record: ?,
pub signed_peer_record: Option<SignedEnvelope>,
}

/// A Control message received by the gossipsub system.
Expand Down Expand Up @@ -330,8 +329,9 @@ impl From<Rpc> for proto::RPC {
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
/// TODO, see https://github.com/libp2p/specs/pull/217
signed_peer_record: None,
signed_peer_record: info
.signed_peer_record
.map(|spr| spr.into_protobuf_encoding()),
})
.collect(),
backoff,
Expand Down