Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nest lookup type into request id SingleBlock and SingleBlob #5562

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,7 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. }
| SyncId::SingleBlob { .. }
| SyncId::ParentLookup { .. }
| SyncId::ParentLookupBlob { .. } => {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
return;
}
Expand Down Expand Up @@ -561,15 +558,15 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
id @ SyncId::SingleBlock { .. } => id,
SyncId::BackFillBlocks { .. }
| SyncId::RangeBlocks { .. }
| SyncId::RangeBlockAndBlobs { .. }
| SyncId::BackFillBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. } => {
SyncId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
return;
}
Expand Down Expand Up @@ -602,8 +599,8 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlob { .. } | SyncId::ParentLookupBlob { .. }) => id,
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ResponseType {
Blob,
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum LookupType {
Current,
Parent,
Expand Down Expand Up @@ -119,6 +119,7 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
let id = SingleLookupReqId {
id,
req_counter: self.get_state().req_counter,
lookup_type: L::lookup_type(),
};
Self::make_request(id, peer_id, request, cx)
}
Expand Down Expand Up @@ -265,7 +266,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.block_lookup_request(id, peer_id, request, L::lookup_type())
cx.block_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

Expand Down Expand Up @@ -365,7 +366,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.blob_lookup_request(id, peer_id, request, L::lookup_type())
cx.blob_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn parent_lookup_failed<R: RequestState<Parent, T>>(
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
peer_id: &PeerId,
cx: &SyncNetworkContext<T>,
error: RPCError,
) {
Expand Down
48 changes: 29 additions & 19 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl TestRig {
beacon_block: Option<Arc<SignedBeaconBlock<E>>>,
) {
self.send_sync_message(SyncMessage::RpcBlock {
request_id: SyncRequestId::ParentLookup { id },
request_id: SyncRequestId::SingleBlock { id },
peer_id,
beacon_block,
seen_timestamp: D,
Expand Down Expand Up @@ -324,7 +324,7 @@ impl TestRig {
blob_sidecar: Option<Arc<BlobSidecar<E>>>,
) {
self.send_sync_message(SyncMessage::RpcBlob {
request_id: SyncRequestId::ParentLookupBlob { id },
request_id: SyncRequestId::SingleBlob { id },
peer_id,
blob_sidecar,
seen_timestamp: D,
Expand All @@ -348,7 +348,7 @@ impl TestRig {
fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) {
self.send_sync_message(SyncMessage::RpcError {
peer_id,
request_id: SyncRequestId::ParentLookup { id },
request_id: SyncRequestId::SingleBlock { id },
error,
})
}
Expand Down Expand Up @@ -409,7 +409,11 @@ impl TestRig {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
} if id.lookup_type == LookupType::Current
&& request.block_roots().to_vec().contains(&for_block) =>
{
Some(*id)
}
_ => None,
})
.unwrap_or_else(|e| panic!("Expected block request for {for_block:?}: {e}"))
Expand All @@ -422,11 +426,12 @@ impl TestRig {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
} if request
.blob_ids
.to_vec()
.iter()
.any(|r| r.block_root == for_block) =>
} if id.lookup_type == LookupType::Current
&& request
.blob_ids
.to_vec()
.iter()
.any(|r| r.block_root == for_block) =>
{
Some(*id)
}
Expand All @@ -441,8 +446,12 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlocksByRoot(request),
request_id: RequestId::Sync(SyncRequestId::ParentLookup { id }),
} if request.block_roots().to_vec().contains(&for_block) => Some(*id),
request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }),
} if id.lookup_type == LookupType::Parent
&& request.block_roots().to_vec().contains(&for_block) =>
{
Some(*id)
}
_ => None,
})
.unwrap_or_else(|e| panic!("Expected block parent request for {for_block:?}: {e}"))
Expand All @@ -454,12 +463,13 @@ impl TestRig {
NetworkMessage::SendRequest {
peer_id: _,
request: Request::BlobsByRoot(request),
request_id: RequestId::Sync(SyncRequestId::ParentLookupBlob { id }),
} if request
.blob_ids
.to_vec()
.iter()
.all(|r| r.block_root == for_block) =>
request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }),
} if id.lookup_type == LookupType::Parent
&& request
.blob_ids
.to_vec()
.iter()
.all(|r| r.block_root == for_block) =>
{
Some(*id)
}
Expand Down Expand Up @@ -1974,15 +1984,15 @@ mod deneb_only {
return;
};
let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(2));
let parent_root = block.parent_root();
let block_root = block.canonical_root();
let blob_0 = blobs[0].clone();
let blob_1 = blobs[1].clone();
let peer_a = r.new_connected_peer();
let peer_b = r.new_connected_peer();
// Send unknown parent block lookup
r.trigger_unknown_parent_block(peer_a, block.into());
// Expect network request for blobs
let id = r.expect_blob_parent_request(parent_root);
let id = r.expect_blob_lookup_request(block_root);
// Peer responses with blob 0
r.single_lookup_blob_response(id, peer_a, Some(blob_0.into()));
// Blob 1 is received via gossip unknown parent blob from a different peer
Expand Down
131 changes: 62 additions & 69 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! search for the block and subsequently search for parents if needed.

use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::common::LookupType;
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, SyncNetworkContext};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
Expand Down Expand Up @@ -80,6 +81,7 @@ pub type Id = u32;
pub struct SingleLookupReqId {
pub id: Id,
pub req_counter: Id,
pub lookup_type: LookupType,
}

/// Id of rpc requests sent by sync to the network.
Expand All @@ -89,12 +91,6 @@ pub enum RequestId {
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request searching for a block's parent. The id is the chain, share with the corresponding
/// blob id.
ParentLookup { id: SingleLookupReqId },
/// Request searching for a block's parent blobs. The id is the chain, shared with the corresponding
/// block id.
ParentLookupBlob { id: SingleLookupReqId },
/// Request was from the backfill sync algorithm.
BackFillBlocks { id: Id },
/// Backfill request that is composed by both a block range request and a blob range request.
Expand Down Expand Up @@ -331,42 +327,42 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
trace!(self.log, "Sync manager received a failed RPC");
match request_id {
RequestId::SingleBlock { id } => {
self.block_lookups
RequestId::SingleBlock { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_block_lookup_failed::<BlockRequestState<Current>>(
id,
&peer_id,
&self.network,
error,
);
}
RequestId::SingleBlob { id } => {
self.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
),
LookupType::Parent => self
.block_lookups
.parent_lookup_failed::<BlockRequestState<Parent>>(
id,
&peer_id,
&self.network,
error,
);
}
RequestId::ParentLookup { id } => {
self.block_lookups
.parent_lookup_failed::<BlockRequestState<Parent>>(
),
},
RequestId::SingleBlob { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_block_lookup_failed::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
&peer_id,
&self.network,
error,
);
}
RequestId::ParentLookupBlob { id } => {
self.block_lookups
),
LookupType::Parent => self
.block_lookups
.parent_lookup_failed::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
&peer_id,
&self.network,
error,
);
}
),
},
RequestId::BackFillBlocks { id } => {
if let Some(batch_id) = self
.network
Expand Down Expand Up @@ -879,30 +875,29 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
RequestId::SingleBlock { id } => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::SingleBlock { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_lookup_response::<BlockRequestState<Current>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
LookupType::Parent => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
},
RequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
RequestId::ParentLookup { id } => self
.block_lookups
.parent_lookup_response::<BlockRequestState<Parent>>(
id,
peer_id,
block,
seen_timestamp,
&self.network,
),
RequestId::ParentLookupBlob { id: _ } => {
crit!(self.log, "Block received during parent blob request"; "peer_id" => %peer_id );
}
RequestId::BackFillBlocks { id } => {
let is_stream_terminator = block.is_none();
if let Some(batch_id) = self
Expand Down Expand Up @@ -963,28 +958,26 @@ impl<T: BeaconChainTypes> SyncManager<T> {
RequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
RequestId::SingleBlob { id } => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),

RequestId::ParentLookup { id: _ } => {
crit!(self.log, "Single blob received during parent block request"; "peer_id" => %peer_id );
}
RequestId::ParentLookupBlob { id } => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
RequestId::SingleBlob { id } => match id.lookup_type {
LookupType::Current => self
.block_lookups
.single_lookup_response::<BlobRequestState<Current, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
LookupType::Parent => self
.block_lookups
.parent_lookup_response::<BlobRequestState<Parent, T::EthSpec>>(
id,
peer_id,
blob,
seen_timestamp,
&self.network,
),
},
RequestId::BackFillBlocks { id: _ } => {
crit!(self.log, "Blob received during backfill block request"; "peer_id" => %peer_id );
}
Expand Down
Loading
Loading