Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Action in single_block_component_processed #5564

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 126 additions & 116 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ pub type DownloadedBlock<E> = (Hash256, RpcBlock<E>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;

enum Action {
Retry,
ParentUnknown { parent_root: Hash256, slot: Slot },
Drop,
Continue,
}

pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
Expand Down Expand Up @@ -773,7 +780,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};

let root = lookup.block_root();
let block_root = lookup.block_root();
let request_state = R::request_state_mut(&mut lookup);

let peer_id = match request_state.get_state().processing_peer() {
Expand All @@ -787,28 +794,49 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.log,
"Block component processed for lookup";
"response_type" => ?R::response_type(),
"block_root" => ?root,
"block_root" => ?block_root,
"result" => ?result,
"id" => target_id,
);

match result {
BlockProcessingResult::Ok(status) => match status {
AvailabilityProcessingStatus::Imported(root) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
AvailabilityProcessingStatus::MissingComponents(_, _block_root) => {
match self.handle_missing_components::<R>(cx, &mut lookup) {
Ok(()) => {
self.single_block_lookups.insert(target_id, lookup);
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
}
}
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Successfully imported
trace!(self.log, "Single block processing succeeded"; "block" => %block_root);
Action::Drop
}

BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
_,
_block_root,
)) => {
// `on_processing_success` is called here to ensure the request state is updated prior to checking
// if both components have been processed.
if R::request_state_mut(&mut lookup)
.get_state_mut()
.on_processing_success()
.is_err()
{
warn!(
self.log,
"Single block processing state incorrect";
"action" => "dropping single block request"
);
Action::Drop
// If this was the result of a block request, we can't determined if the block peer did anything
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
} else if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);

// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
Action::Retry
} else {
Action::Continue
}
},
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
Expand All @@ -817,118 +845,88 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Single block processing was ignored, cpu might be overloaded";
"action" => "dropping single block request"
);
Action::Drop
}
BlockProcessingResult::Err(e) => {
match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) {
Ok(Some(lookup)) => {
self.single_block_lookups.insert(target_id, lookup);
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
Action::Drop
}
Ok(None) => {
// Drop without an additional error.
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
Action::ParentUnknown { parent_root, slot }
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
Action::Drop
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
Action::Retry
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
Action::Retry
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);

lookup.block_request_state.state.on_processing_failure();
}
Action::Retry
}
}
}
};
}

/// Handles a `MissingComponents` block processing error. Handles peer scoring and retries.
///
/// If this was the result of a block request, we can't determined if the block peer did anything
/// wrong. If we already had both a block and blobs response processed, we should penalize the
/// blobs peer because they did not provide all blobs on the initial request.
fn handle_missing_components<R: RequestState<Current, T>>(
&self,
cx: &SyncNetworkContext<T>,
lookup: &mut SingleBlockLookup<Current, T>,
) -> Result<(), LookupRequestError> {
let request_state = R::request_state_mut(lookup);

request_state
.get_state_mut()
.on_processing_success()
.map_err(LookupRequestError::BadState)?;
if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);

// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?;
}
Ok(())
}

/// Handles peer scoring and retries related to a `BlockError` in response to a single block
/// or blob lookup processing result.
fn handle_single_lookup_block_error(
&mut self,
cx: &mut SyncNetworkContext<T>,
mut lookup: SingleBlockLookup<Current, T>,
peer_id: PeerId,
e: BlockError<T::EthSpec>,
) -> Result<Option<SingleBlockLookup<Current, T>>, LookupRequestError> {
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BlockIsAlreadyKnown(_) => {
// No error here
return Ok(None);
}
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
return Ok(None);
match action {
Action::Retry => {
if let Err(e) = lookup.request_block_and_blobs(cx) {
warn!(self.log, "Single block lookup failed"; "block_root" => %block_root, "error" => ?e);
// Failed with too many retries, drop with noop
self.update_metrics();
} else {
self.single_block_lookups.insert(target_id, lookup);
}
}
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
lookup.request_block_and_blobs(cx)?;
self.search_parent(slot, root, parent_root, peer_id, cx);
Action::ParentUnknown { parent_root, slot } => {
// TODO: Consider including all peers from the lookup, claiming to know this block, not
// just the one that sent this specific block
self.search_parent(slot, block_root, parent_root, peer_id, cx);
self.single_block_lookups.insert(target_id, lookup);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
return Ok(None);
Action::Drop => {
// drop with noop
self.update_metrics();
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
lookup.request_block_and_blobs(cx)?
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
lookup.request_block_and_blobs(cx)?
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);

// Try it again if possible.
lookup.block_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?
}
Action::Continue => {
self.single_block_lookups.insert(target_id, lookup);
}
}
Ok(Some(lookup))
}

pub fn parent_block_processed(
Expand Down Expand Up @@ -1369,4 +1367,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn drop_parent_chain_requests(&mut self) -> usize {
self.parent_lookups.drain(..).len()
}

pub fn update_metrics(&self) {
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);

metrics::set_gauge(
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
self.parent_lookups.len() as i64,
);
}
}
Loading