Skip to content

Commit

Permalink
protocols/gossipsub: Implement unsub backoff spec changes (#2403)
Browse files Browse the repository at this point in the history
Implements the changes specified by
libp2p/specs#383.

Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
divagant-martian and mxinden authored Jan 17, 2022
1 parent 96dbfcd commit 30fc882
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 5 deletions.
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

- Fix `GossipsubConfigBuilder::build()` requiring `&self` to live for `'static` (see [PR 2409])

- Implement Unsubscribe backoff as per [libp2p specs PR 383] (see [PR 2403]).

[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
[PR 2409]: https://github.com/libp2p/rust-libp2p/pull/2409
[PR 2403]: https://github.com/libp2p/rust-libp2p/pull/2403
[libp2p specs PR 383]: https://github.com/libp2p/specs/pull/383

# 0.34.0 [2021-11-16]

Expand Down
26 changes: 21 additions & 5 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ where
topic_hash: &TopicHash,
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> GossipsubControlAction {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
Expand Down Expand Up @@ -1088,14 +1089,19 @@ where
Vec::new()
};

let backoff = if on_unsubscribe {
self.config.unsubscribe_backoff()
} else {
self.config.prune_backoff()
};

// update backoff
self.backoffs
.update_backoff(topic_hash, peer, self.config.prune_backoff());
self.backoffs.update_backoff(topic_hash, peer, backoff);

GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(self.config.prune_backoff().as_secs()),
backoff: Some(backoff.as_secs()),
}
}

Expand All @@ -1111,7 +1117,9 @@ where
for peer in peers {
// Send a PRUNE control message
debug!("LEAVE: Sending PRUNE to peer: {:?}", peer);
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1487,9 +1495,10 @@ where

if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let prune_messages = to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px))
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect();
// Send the prune messages to the peer
debug!(
Expand Down Expand Up @@ -2598,6 +2607,9 @@ where
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
if let Some(topics) = to_prune.remove(&peer) {
let mut prunes = topics
.iter()
Expand All @@ -2606,6 +2618,7 @@ where
topic_hash,
&peer,
self.config.do_px() && !no_px.contains(&peer),
on_unsubscribe,
)
})
.collect::<Vec<_>>();
Expand All @@ -2630,13 +2643,16 @@ where
}

// handle the remaining prunes
// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
for (peer, topics) in to_prune.iter() {
let mut remaining_prunes = Vec::new();
for topic_hash in topics {
let prune = self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
on_unsubscribe,
);
remaining_prunes.push(prune);
// inform the handler
Expand Down
71 changes: 71 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,77 @@ mod tests {
);
}

#[test]
fn test_unsubscribe_backoff() {
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100);
let config = GossipsubConfigBuilder::default()
.backoff_slack(1)
// ensure a prune_backoff > unsubscribe_backoff
.prune_backoff(Duration::from_secs(5))
.unsubscribe_backoff(1)
.heartbeat_interval(HEARTBEAT_INTERVAL)
.build()
.unwrap();

let topic = String::from("test");
// only one peer => mesh too small and will try to regraft as early as possible
let (mut gs, _, topics) = inject_nodes1()
.peer_no(1)
.topics(vec![topic.clone()])
.to_subscribe(true)
.gs_config(config)
.create_network();

let _ = gs.unsubscribe(&Topic::new(topic.clone()));

assert_eq!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1),
_ => false,
}),
1,
"Peer should be pruned with `unsubscribe_backoff`."
);

let _ = gs.subscribe(&Topic::new(topics[0].to_string()));

// forget all events until now
flush_events(&mut gs);

// call heartbeat
gs.heartbeat();

// Sleep for one second and apply 10 regular heartbeats (interval = 100ms).
for _ in 0..10 {
sleep(HEARTBEAT_INTERVAL);
gs.heartbeat();
}

// Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat
// is needed).
assert_eq!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Graft { .. } => true,
_ => false,
}),
0,
"Graft message created too early within backoff period"
);

// Heartbeat one more time this should graft now
sleep(HEARTBEAT_INTERVAL);
gs.heartbeat();

// check that graft got created
assert!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Graft { .. } => true,
_ => false,
}) > 0,
"No graft message was created after backoff period"
);
}

#[test]
fn test_flood_publish() {
let config: GossipsubConfig = GossipsubConfig::default();
Expand Down
26 changes: 26 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct GossipsubConfig {
do_px: bool,
prune_peers: usize,
prune_backoff: Duration,
unsubscribe_backoff: Duration,
backoff_slack: u32,
flood_publish: bool,
graft_flood_threshold: Duration,
Expand Down Expand Up @@ -276,6 +277,15 @@ impl GossipsubConfig {
self.prune_backoff
}

/// Controls the backoff time when unsubscribing from a topic.
///
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
/// is 10 seconds.
pub fn unsubscribe_backoff(&self) -> Duration {
self.unsubscribe_backoff
}

/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
/// solves problems occuring through high latencies. In particular if
Expand Down Expand Up @@ -421,6 +431,7 @@ impl Default for GossipsubConfigBuilder {
do_px: false,
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
prune_backoff: Duration::from_secs(60),
unsubscribe_backoff: Duration::from_secs(10),
backoff_slack: 1,
flood_publish: true,
graft_flood_threshold: Duration::from_secs(10),
Expand Down Expand Up @@ -636,6 +647,16 @@ impl GossipsubConfigBuilder {
self
}

/// Controls the backoff time when unsubscribing from a topic.
///
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
/// is 10 seconds.
pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self {
self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff);
self
}

/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
/// solves problems occuring through high latencies. In particular if
Expand Down Expand Up @@ -777,6 +798,11 @@ impl GossipsubConfigBuilder {
"The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2",
);
}

if self.config.unsubscribe_backoff.as_millis() == 0 {
return Err("The unsubscribe_backoff parameter should be positive.");
}

Ok(self.config.clone())
}
}
Expand Down

0 comments on commit 30fc882

Please sign in to comment.