Skip to content

Commit

Permalink
Fix lookup disconnect peer (#5815)
Browse files Browse the repository at this point in the history
* Test lookup peer disconnect modes

* Fix lookup peer disconnected return early
  • Loading branch information
dapplion authored May 20, 2024
1 parent b5de925 commit 2a87016
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 83 deletions.
15 changes: 11 additions & 4 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
{
for &peer_id in lookup.all_used_peers() {
for &peer_id in lookup.all_peers() {
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
Expand Down Expand Up @@ -387,8 +387,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.single_block_lookups.retain(|_, lookup| {
if lookup.remove_peer(peer_id) {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
lookup.remove_peer(peer_id);

// Note: this condition should be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
if lookup.has_no_peers() {
debug!(self.log,
"Dropping single lookup after peer disconnection";
"block_root" => ?lookup.block_root()
);
false
} else {
true
Expand Down Expand Up @@ -545,7 +552,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
let peers = lookup.all_peers().copied().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Expand Down
105 changes: 29 additions & 76 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::sync::network_context::{
};
use beacon_chain::BeaconChainTypes;
use derivative::Derivative;
use itertools::Itertools;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;
Expand Down Expand Up @@ -64,6 +63,9 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
Expand All @@ -78,8 +80,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
) -> Self {
Self {
id,
block_request_state: BlockRequestState::new(requested_block_root, peers),
blob_request_state: BlobRequestState::new(requested_block_root, peers),
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
Expand Down Expand Up @@ -134,22 +137,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}

/// Get all unique used peers across block and blob requests.
pub fn all_used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_used_peers()
.chain(self.blob_request_state.state.get_used_peers())
.unique()
}

/// Get all unique available peers across block and blob requests.
pub fn all_available_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_available_peers()
.chain(self.blob_request_state.state.get_available_peers())
.unique()
/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand Down Expand Up @@ -198,7 +188,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let Some(peer_id) = request.get_state_mut().use_rand_available_peer() else {
let Some(peer_id) = self.use_rand_available_peer() else {
if awaiting_parent {
// Allow lookups awaiting for a parent to have zero peers. If when the parent
// resolve they still have zero peers the lookup will fail gracefully.
Expand All @@ -208,6 +198,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
request.get_state_mut().on_download_start(req_id)?
Expand Down Expand Up @@ -238,9 +229,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
let inserted_block = self.block_request_state.state.add_peer(&peer_id);
let inserted_blob = self.blob_request_state.state.add_peer(&peer_id);
inserted_block || inserted_blob
self.peers.insert(peer_id)
}

/// Returns true if the block has already been downloaded.
Expand All @@ -252,8 +241,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id)
&& self.blob_request_state.state.remove_peer(peer_id)
self.peers.remove(peer_id)
}

/// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool {
self.peers.is_empty()
}

/// Selects a random peer from available peers if any
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
self.peers.iter().choose(&mut rand::thread_rng()).copied()
}
}

Expand All @@ -267,10 +265,10 @@ pub struct BlobRequestState<E: EthSpec> {
}

impl<E: EthSpec> BlobRequestState<E> {
pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self {
pub fn new(block_root: Hash256) -> Self {
Self {
block_root,
state: SingleLookupRequestState::new(peer_source),
state: SingleLookupRequestState::new(),
}
}
}
Expand All @@ -285,10 +283,10 @@ pub struct BlockRequestState<E: EthSpec> {
}

impl<E: EthSpec> BlockRequestState<E> {
pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
state: SingleLookupRequestState::new(peers),
state: SingleLookupRequestState::new(),
}
}
}
Expand Down Expand Up @@ -318,29 +316,16 @@ pub enum State<T: Clone> {
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State<T>,
/// Peers that should have this block or blob.
#[derivative(Debug(format_with = "fmt_peer_set"))]
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
#[derivative(Debug = "ignore")]
used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
/// How many times have we attempted to download this block or blob.
failed_downloading: u8,
}

impl<T: Clone> SingleLookupRequestState<T> {
pub fn new(peers: &[PeerId]) -> Self {
let mut available_peers = HashSet::default();
for peer in peers.iter().copied() {
available_peers.insert(peer);
}

pub fn new() -> Self {
Self {
state: State::AwaitingDownload,
available_peers,
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
}
Expand Down Expand Up @@ -518,38 +503,6 @@ impl<T: Clone> SingleLookupRequestState<T> {
pub fn more_failed_processing_attempts(&self) -> bool {
self.failed_processing >= self.failed_downloading
}

/// Add peer to this request states. The peer must be able to serve this request.
/// Returns true if the peer is newly inserted.
pub fn add_peer(&mut self, peer_id: &PeerId) -> bool {
self.available_peers.insert(*peer_id)
}

/// Remove peer from available peers. Return true if there are no more available peers and the
/// request is not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
self.available_peers.remove(disconnected_peer_id);
self.available_peers.is_empty() && self.is_awaiting_download()
}

pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
self.used_peers.iter()
}

pub fn get_available_peers(&self) -> impl Iterator<Item = &PeerId> {
self.available_peers.iter()
}

/// Selects a random peer from available peers if any, inserts it in used peers and returns it.
pub fn use_rand_available_peer(&mut self) -> Option<PeerId> {
let peer_id = self
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()?;
self.used_peers.insert(peer_id);
Some(peer_id)
}
}

// Display is used in the BadState assertions above
Expand All @@ -573,7 +526,7 @@ impl<T: Clone> std::fmt::Debug for State<T> {
}
}

fn fmt_peer_set(
fn fmt_peer_set_as_len(
peer_set: &HashSet<PeerId>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
Expand Down
22 changes: 19 additions & 3 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,10 @@ impl TestRig {

fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
}

// Return RPCErrors for all active requests of peer
/// Return RPCErrors for all active requests of peer
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
Expand Down Expand Up @@ -1265,27 +1267,41 @@ fn test_parent_lookup_too_deep() {
}

#[test]
fn test_parent_lookup_disconnection_no_peers_left() {
fn test_lookup_peer_disconnected_no_peers_left_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
rig.rpc_error_all_active_requests(peer_id);
rig.expect_no_active_lookups();
}

#[test]
fn test_lookup_peer_disconnected_no_peers_left_not_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
// Note: this test case may be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
rig.expect_no_active_lookups();
}

#[test]
fn test_lookup_disconnection_peer_left() {
let mut rig = TestRig::test_setup();
let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::<Vec<_>>();
let disconnecting_peer = *peer_ids.first().unwrap();
let block_root = Hash256::random();
// lookup should have two peers associated with the same block
for peer_id in peer_ids.iter() {
rig.trigger_unknown_block_from_attestation(block_root, *peer_id);
}
// Disconnect the first peer only, which is the one handling the request
rig.peer_disconnected(*peer_ids.first().unwrap());
rig.peer_disconnected(disconnecting_peer);
rig.rpc_error_all_active_requests(disconnecting_peer);
rig.assert_single_lookups_count(1);
}

Expand Down

0 comments on commit 2a87016

Please sign in to comment.