From d292ea66cdd8d9250e456622f4786fcc5cce7641 Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 27 Dec 2021 14:52:15 -0500 Subject: [PATCH 1/6] include unsub backoff in gossipsub's config --- protocols/gossipsub/src/config.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index bbf19c99b0f..a80d459f088 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -77,6 +77,7 @@ pub struct GossipsubConfig { do_px: bool, prune_peers: usize, prune_backoff: Duration, + unsubscribe_backoff: Duration, backoff_slack: u32, flood_publish: bool, graft_flood_threshold: Duration, @@ -276,6 +277,15 @@ impl GossipsubConfig { self.prune_backoff } + /// Controls the backoff time when unsubscribing from a topic. + /// + /// This is how long to wait before resubscribing to the topic. A short backoff period in case + /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default + /// is 10 seconds. + pub fn unsubscribe_backoff(&self) -> Duration { + self.unsubscribe_backoff + } + /// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This /// solves problems occuring through high latencies. In particular if @@ -421,6 +431,7 @@ impl Default for GossipsubConfigBuilder { do_px: false, prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented. prune_backoff: Duration::from_secs(60), + unsubscribe_backoff: Duration::from_secs(10), backoff_slack: 1, flood_publish: true, graft_flood_threshold: Duration::from_secs(10), From 8689fe7c0e402c16d11a1cf75c85b7ee759feb4d Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 27 Dec 2021 15:03:08 -0500 Subject: [PATCH 2/6] include unsub backoff in gossipsub's config builder --- protocols/gossipsub/src/config.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index a80d459f088..1f1b7901ef9 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -647,6 +647,16 @@ impl GossipsubConfigBuilder { self } + /// Controls the backoff time when unsubscribing from a topic. + /// + /// This is how long to wait before resubscribing to the topic. A short backoff period in case + /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default + /// is 10 seconds. + pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: Duration) -> &mut Self { + self.config.unsubscribe_backoff = unsubscribe_backoff; + self + } + /// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This /// solves problems occuring through high latencies. In particular if @@ -788,6 +798,11 @@ impl GossipsubConfigBuilder { "The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2", ); } + + if self.config.unsubscribe_backoff.as_secs() < 1 { + return Err("The unsubscribe_backoff parameter should be at least one second."); + } + Ok(self.config.clone()) } } From f080b019ea1fe2367aab3d97f573f47f5f4d9b7b Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 27 Dec 2021 18:11:00 -0500 Subject: [PATCH 3/6] use unsubscribe backoff where appropriate --- protocols/gossipsub/src/behaviour.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ed803c6d59a..ba268855c1f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1049,6 +1049,7 @@ where topic_hash: &TopicHash, peer: &PeerId, do_px: bool, + on_unsubscribe: bool, ) -> GossipsubControlAction { if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.prune(peer, topic_hash.clone()); @@ -1092,10 +1093,15 @@ where self.backoffs .update_backoff(topic_hash, peer, self.config.prune_backoff()); + let backoff = if on_unsubscribe { + self.config.unsubscribe_backoff() + } else { + self.config.prune_backoff() + }; GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers, - backoff: Some(self.config.prune_backoff().as_secs()), + backoff: Some(backoff.as_secs()), } } @@ -1111,7 +1117,9 @@ where for peer in peers { // Send a PRUNE control message debug!("LEAVE: Sending PRUNE to peer: {:?}", peer); - let control = self.make_prune(topic_hash, &peer, self.config.do_px()); + let on_unsubscribe = true; + let control = + self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe); Self::control_pool_add(&mut self.control_pool, peer, control); // If the peer did not previously exist in any mesh, inform the handler @@ -1487,9 +1495,10 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send + let on_unsubscribe = false; let prune_messages = to_prune_topics .iter() - .map(|t| self.make_prune(t, peer_id, do_px)) + .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) .collect(); // Send the prune messages to the peer debug!( @@ -2598,6 +2607,9 @@ where // NOTE: In this case a peer has been added to a topic mesh, and removed from another. // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. + + // The following prunes are not due to unsubscribing. + let on_unsubscribe = false; if let Some(topics) = to_prune.remove(&peer) { let mut prunes = topics .iter() @@ -2606,6 +2618,7 @@ where topic_hash, &peer, self.config.do_px() && !no_px.contains(&peer), + on_unsubscribe, ) }) .collect::>(); @@ -2630,6 +2643,8 @@ where } // handle the remaining prunes + // The following prunes are not due to unsubscribing. + let on_unsubscribe = false; for (peer, topics) in to_prune.iter() { let mut remaining_prunes = Vec::new(); for topic_hash in topics { @@ -2637,6 +2652,7 @@ where topic_hash, peer, self.config.do_px() && !no_px.contains(peer), + on_unsubscribe, ); remaining_prunes.push(prune); // inform the handler From ddc2077d1e5ce3e2def85a872fcabba4c607caaf Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 27 Dec 2021 18:16:43 -0500 Subject: [PATCH 4/6] add changelog entry --- protocols/gossipsub/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index abaeb2d7cd6..39b6d18a2f3 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -9,9 +9,13 @@ - Improve bandwidth performance by tracking IWANTs and reducing duplicate sends (see [PR 2327]). +- Implement Unsubscribe backoff as per [libp2p specs PR 383]. + [PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346 [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327 +[PR 2403]: https://github.com/libp2p/rust-libp2p/pull/2403 +[libp2p specs PR 383]: https://github.com/libp2p/specs/pull/383 # 0.34.0 [2021-11-16] From 497e056623c3c8f2adc439cb752aa63ba6ad843e Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 14 Jan 2022 15:06:18 -0500 Subject: [PATCH 5/6] add unsub backoff test --- protocols/gossipsub/src/behaviour.rs | 12 ++-- protocols/gossipsub/src/behaviour/tests.rs | 71 ++++++++++++++++++++++ protocols/gossipsub/src/config.rs | 8 +-- 3 files changed, 81 insertions(+), 10 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index ba268855c1f..e1f3a582b94 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1089,15 +1089,15 @@ where Vec::new() }; - // update backoff - self.backoffs - .update_backoff(topic_hash, peer, self.config.prune_backoff()); - let backoff = if on_unsubscribe { self.config.unsubscribe_backoff() } else { self.config.prune_backoff() }; + + // update backoff + self.backoffs.update_backoff(topic_hash, peer, backoff); + GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers, @@ -1404,8 +1404,8 @@ where { if backoff_time > now { warn!( - "[Penalty] Peer attempted graft within backoff time, penalizing {}", - peer_id + "[Penalty] Peer attempted graft within backoff time, penalizing {} {remaining:?}", + peer_id, remaining = backoff_time - now ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7e921c3e425..ad68f5f8e28 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -2032,6 +2032,77 @@ mod tests { ); } + #[test] + fn test_unsubscribe_backoff() { + const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); + let config = GossipsubConfigBuilder::default() + .backoff_slack(1) + // ensure a prune_backoff > unsubscribe_backoff + .prune_backoff(Duration::from_secs(5)) + .unsubscribe_backoff(1) + .heartbeat_interval(HEARTBEAT_INTERVAL) + .build() + .unwrap(); + + let topic = String::from("test"); + // only one peer => mesh too small and will try to regraft as early as possible + let (mut gs, _, topics) = inject_nodes1() + .peer_no(1) + .topics(vec![topic.clone()]) + .to_subscribe(true) + .gs_config(config) + .create_network(); + + let _ = gs.unsubscribe(&Topic::new(topic.clone())); + + assert_eq!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1), + _ => false, + }), + 1, + "Peer should be pruned with `unsubscribe_backoff`." + ); + + let _ = gs.subscribe(&Topic::new(topics[0].to_string())); + + // forget all events until now + flush_events(&mut gs); + + // call heartbeat + gs.heartbeat(); + + // Sleep for one second and apply 10 regular heartbeats (interval = 100ms). + for _ in 0..10 { + sleep(HEARTBEAT_INTERVAL); + gs.heartbeat(); + } + + // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat + // is needed). + assert_eq!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Graft { .. } => true, + _ => false, + }), + 0, + "Graft message created too early within backoff period" + ); + + // Heartbeat one more time this should graft now + sleep(HEARTBEAT_INTERVAL); + gs.heartbeat(); + + // check that graft got created + assert!( + count_control_msgs(&gs, |_, m| match m { + GossipsubControlAction::Graft { .. } => true, + _ => false, + }) > 0, + "No graft message was created after backoff period" + ); + } + #[test] fn test_flood_publish() { let config: GossipsubConfig = GossipsubConfig::default(); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 19428dce60d..e2758cd45da 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -652,8 +652,8 @@ impl GossipsubConfigBuilder { /// This is how long to wait before resubscribing to the topic. A short backoff period in case /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default /// is 10 seconds. - pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: Duration) -> &mut Self { - self.config.unsubscribe_backoff = unsubscribe_backoff; + pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self { + self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff); self } @@ -799,8 +799,8 @@ impl GossipsubConfigBuilder { ); } - if self.config.unsubscribe_backoff.as_secs() < 1 { - return Err("The unsubscribe_backoff parameter should be at least one second."); + if self.config.unsubscribe_backoff.as_millis() == 0 { + return Err("The unsubscribe_backoff parameter should be positive."); } Ok(self.config.clone()) From b637f20cce9956c02405b093adfb61cbb60f423e Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 14 Jan 2022 16:12:23 -0500 Subject: [PATCH 6/6] cleanup --- protocols/gossipsub/src/behaviour.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index e1f3a582b94..34fe368ef40 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1404,8 +1404,8 @@ where { if backoff_time > now { warn!( - "[Penalty] Peer attempted graft within backoff time, penalizing {} {remaining:?}", - peer_id, remaining = backoff_time - now + "[Penalty] Peer attempted graft within backoff time, penalizing {}", + peer_id ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score {