Skip to content

Commit

Permalink
Use signed peer records on handle prune
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Mar 10, 2023
1 parent 12b785e commit d1d468c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 25 deletions.
2 changes: 1 addition & 1 deletion core/src/signed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use unsigned_varint::encode::usize_buffer;
/// A signed envelope contains an arbitrary byte string payload, a signature of the payload, and the public key that can be used to verify the signature.
///
/// For more details see libp2p RFC0002: <https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md>
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SignedEnvelope {
key: PublicKey,
payload_type: Vec<u8>,
Expand Down
29 changes: 23 additions & 6 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use rand::{seq::SliceRandom, thread_rng};

use libp2p_core::{
identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr,
PeerId,
PeerId, PeerRecord,
};
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
Expand Down Expand Up @@ -1073,7 +1073,13 @@ where
|p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
)
.into_iter()
.map(|p| PeerInfo { peer_id: Some(p) })
.map(|p| PeerInfo {
peer_id: Some(p),
// TODO: Retrieve signed_peer_record from Registrations store
// go-gossipsub uses a global store https://github.com/libp2p/go-libp2p-pubsub/blob/829f9026a3dcf12b268efad6a140dd99446cf17b/gossipsub.go#L1874
// rust-libp2p collects records with the server behaviour of rendezvous protocol
signed_peer_record: None,
})
.collect()
} else {
Vec::new()
Expand Down Expand Up @@ -1623,15 +1629,26 @@ where
}

for p in px {
// TODO: Once signed records are spec'd: extract signed peer record if given and handle
// it, see https://github.com/libp2p/specs/pull/217
if let Some(peer_id) = p.peer_id {
let dial_opts = if let Some(signed_peer_record) = p.signed_peer_record {
// If signed envelop is invalid, return None and ignore peer exchange
if let Ok(peer_record) = PeerRecord::from_signed_envelope(signed_peer_record) {
Some((peer_record.peer_id(), peer_record.addresses().to_vec()))
} else {
None
}
} else if let Some(peer_id) = p.peer_id {
Some::<(PeerId, Vec<Multiaddr>)>((peer_id, vec![]))
} else {
None
};

if let Some((peer_id, addresses)) = dial_opts {
// mark as px peer
self.px_peers.insert(peer_id);

// dial peer
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id).build(),
opts: DialOpts::peer_id(peer_id).addresses(addresses).build(),
});
}
}
Expand Down
72 changes: 67 additions & 5 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
.filter_map(|info| {
info.peer_id
.and_then(|id| PeerId::from_bytes(&id).ok())
.map(|peer_id|
//TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
PeerInfo {
peer_id: Some(peer_id),
})
.map(|peer_id| PeerInfo {
peer_id: Some(peer_id),
signed_peer_record: None,
})
})
.collect::<Vec<PeerInfo>>();

Expand Down Expand Up @@ -1814,6 +1813,7 @@ fn test_connect_to_px_peers_on_handle_prune() {
for _ in 0..config.prune_peers() + 5 {
px.push(PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
});
}

Expand Down Expand Up @@ -1852,6 +1852,65 @@ fn test_connect_to_px_peers_on_handle_prune() {
));
}

#[test]
fn test_connect_to_px_peers_with_peer_record_on_handle_prune() {
let config: Config = Config::default();

let (mut gs, peers, topics) = inject_nodes1()
.peer_no(1)
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// handle prune from single peer with px peers

let mut px_peer_ids = Vec::new();
let mut px = Vec::new();
// propose more px peers than config.prune_peers()
for i in 0..config.prune_peers() + 5 {
let key = Keypair::generate_ed25519();
let address: Multiaddr = format!("/ip4/1.2.3.4/tcp/{i}").try_into().unwrap();
let peer_record = PeerRecord::new(&key, vec![address]).unwrap();

px_peer_ids.push(key.public().to_peer_id());
px.push(PeerInfo {
peer_id: None,
signed_peer_record: Some(peer_record.into_signed_envelope()),
});
}

gs.handle_prune(
&peers[0],
vec![(
topics[0].clone(),
px.clone(),
Some(config.prune_backoff().as_secs()),
)],
);

// Check DialPeer events for px peers
let dials: Vec<_> = gs
.events
.iter()
.filter_map(|e| match e {
// TODO: How to extract addresses from DialOpts to assert them in the test?
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(),
_ => None,
})
.collect();

// Exactly config.prune_peers() many random peers should be dialled
assert_eq!(dials.len(), config.prune_peers());

let dials_set: HashSet<_> = dials.into_iter().collect();

// No duplicates
assert_eq!(dials_set.len(), config.prune_peers());

// all dial peers must be in px
assert!(dials_set.is_subset(&px_peer_ids.iter().map(|i| *i).collect::<HashSet<_>>()));
}

#[test]
fn test_send_px_and_backoff_in_prune() {
let config: Config = Config::default();
Expand Down Expand Up @@ -2474,6 +2533,7 @@ fn test_ignore_px_from_negative_scored_peer() {
//handle prune from single peer with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];

gs.handle_prune(
Expand Down Expand Up @@ -3067,6 +3127,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
// Handle prune from peer peers[0] with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];
gs.handle_prune(
&peers[0],
Expand All @@ -3089,6 +3150,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
//handle prune from peer peers[1] with px peers
let px = vec![PeerInfo {
peer_id: Some(PeerId::random()),
signed_peer_record: None,
}];
gs.handle_prune(
&peers[1],
Expand Down
25 changes: 18 additions & 7 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
use futures::future;
use futures::prelude::*;
use libp2p_core::SignedEnvelope;
use libp2p_core::{
identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
};
Expand Down Expand Up @@ -509,14 +510,24 @@ impl Decoder for GossipsubCodec {
.peers
.into_iter()
.filter_map(|info| {
info.peer_id
let peer_id = info
.peer_id
.as_ref()
.and_then(|id| PeerId::from_bytes(id).ok())
.map(|peer_id|
//TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
PeerInfo {
peer_id: Some(peer_id),
})
.and_then(|peer_id| PeerId::from_bytes(peer_id).ok());

let signed_peer_record = info
.signed_peer_record
.as_ref()
.and_then(|spr| SignedEnvelope::from_protobuf_encoding(spr).ok());

if peer_id.is_none() && signed_peer_record.is_none() {
None
} else {
Some(PeerInfo {
peer_id,
signed_peer_record,
})
}
})
.collect::<Vec<PeerInfo>>();

Expand Down
11 changes: 5 additions & 6 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

//! A collection of types using the Gossipsub system.
use crate::TopicHash;
use libp2p_core::PeerId;
use libp2p_core::{PeerId, SignedEnvelope};
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
Expand Down Expand Up @@ -200,9 +200,7 @@ pub enum SubscriptionAction {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PeerInfo {
pub peer_id: Option<PeerId>,
//TODO add this when RFC: Signed Address Records got added to the spec (see pull request
// https://github.com/libp2p/specs/pull/217)
//pub signed_peer_record: ?,
pub signed_peer_record: Option<SignedEnvelope>,
}

/// A Control message received by the gossipsub system.
Expand Down Expand Up @@ -330,8 +328,9 @@ impl From<Rpc> for proto::RPC {
.into_iter()
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
/// TODO, see https://github.com/libp2p/specs/pull/217
signed_peer_record: None,
signed_peer_record: info
.signed_peer_record
.map(|spr| spr.into_protobuf_encoding()),
})
.collect(),
backoff,
Expand Down

0 comments on commit d1d468c

Please sign in to comment.