Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-process early sampling requests #5569

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::DataColumnSidecarList;
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier, DataColumnSidecarList};
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -541,6 +541,12 @@ pub struct BeaconBlockResponse<E: EthSpec, Payload: AbstractExecPayload<E>> {
pub consensus_block_value: u64,
}

pub enum BlockImportStatus<E: EthSpec> {
PendingImport(Arc<SignedBeaconBlock<E>>),
Imported,
Unknown,
}

impl FinalizationAndCanonicity {
pub fn is_finalized(self) -> bool {
self.slot_is_finalized && self.canonical
Expand Down Expand Up @@ -1166,6 +1172,66 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

pub fn get_selected_data_columns_checking_all_caches(
&self,
block_root: Hash256,
indices: &[ColumnIndex],
) -> Result<Vec<Arc<DataColumnSidecar<T::EthSpec>>>, Error> {
let columns_from_availability_cache = indices
.iter()
.copied()
.filter_map(|index| {
self.data_availability_checker
.get_data_column(&DataColumnIdentifier { block_root, index })
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
// Existance of a column in the data availability cache and downstream caches is exclusive.
// If there's a single match in the availability cache we can safely skip other sources.
if !columns_from_availability_cache.is_empty() {
return Ok(columns_from_availability_cache);
}

Ok(self
.early_attester_cache
.get_data_columns(block_root)
.map_or_else(|| self.get_data_columns(&block_root), Ok)?
.into_iter()
.filter(|dc| indices.contains(&dc.index))
.collect())
}

/// Returns the import status of block checking (in order) pre-import caches, fork-choice, db store
pub fn get_block_import_status(&self, block_root: &Hash256) -> BlockImportStatus<T::EthSpec> {
if let Some(block) = self
.reqresp_pre_import_cache
.read()
.get(block_root)
.map(|block| {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
block.clone()
})
{
return BlockImportStatus::PendingImport(block);
}
// Check fork-choice before early_attester_cache as the latter is pruned lazily
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(block_root)
{
return BlockImportStatus::Imported;
}
if let Some(block) = self.early_attester_cache.get_block(*block_root) {
return BlockImportStatus::PendingImport(block);
}
if let Ok(true) = self.store.has_block(block_root) {
BlockImportStatus::Imported
} else {
BlockImportStatus::Unknown
}
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use tree_hash::TreeHash;
use types::data_column_sidecar::DataColumnSidecarError;
use types::{BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot};
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar,
DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork,
PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

pub const POS_PANDA_BANNER: &str = r#"
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub use self::chain_config::ChainConfig;
pub use self::errors::{BeaconChainError, BlockProductionError};
pub use self::historical_blocks::HistoricalBlockError;
pub use attestation_verification::Error as AttestationError;
pub use beacon_chain::BlockImportStatus;
pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError};
pub use block_verification::{
get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock,
Expand Down
48 changes: 36 additions & 12 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId};
use types::{EthSpec, Slot};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
};
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};

mod metrics;
pub mod work_reprocessing_queue;
Expand Down Expand Up @@ -141,6 +141,10 @@ const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024;
/// for reprocessing before we start dropping them.
const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128;

/// The maximum number of queued retries of ReqResp sampling requests from the reprocess queue that
/// will be stored before dropping them.
const MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN: usize = 16_384;

/// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping
/// them.
const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048;
Expand Down Expand Up @@ -272,6 +276,7 @@ pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimisti
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const UNKNOWN_BLOCK_SAMPLING_REQUEST: &str = "unknown_block_sampling_request";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";
Expand Down Expand Up @@ -527,6 +532,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
process_fn,
},
},
ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockSamplingRequest { process_fn },
},
ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
Expand Down Expand Up @@ -610,6 +619,9 @@ pub enum Work<E: EthSpec> {
parent_root: Hash256,
process_fn: BlockingFn,
},
UnknownBlockSamplingRequest {
process_fn: BlockingFn,
},
GossipAggregateBatch {
aggregates: Vec<GossipAggregatePackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
Expand Down Expand Up @@ -699,8 +711,9 @@ impl<E: EthSpec> Work<E> {
Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::UnknownBlockSamplingRequest { .. } => UNKNOWN_BLOCK_SAMPLING_REQUEST,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
Expand Down Expand Up @@ -839,6 +852,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN);
let mut unknown_light_client_update_queue =
FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN);

// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -1158,16 +1173,22 @@ impl<E: EthSpec> BeaconProcessor<E> {
// and BlocksByRoot)
} else if let Some(item) = status_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = bbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
// Prioritize by_root requests over by_range as the former are time
// sensitive for recovery
} else if let Some(item) = bbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = dcbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
// Prioritize sampling requests after block syncing requests
} else if let Some(item) = unknown_block_sampling_request_queue.pop() {
self.spawn_worker(item, idle_tx);
// by_range sync after sampling
} else if let Some(item) = bbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbrange_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1344,6 +1365,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::UnknownBlockSamplingRequest { .. } => {
unknown_block_sampling_request_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1530,12 +1554,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
process_fn.await;
}),
Work::UnknownBlockAttestation { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::UnknownBlockAggregate { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::UnknownLightClientOptimisticUpdate {
parent_root: _,
process_fn,
} => task_spawner.spawn_blocking(process_fn),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. }
| Work::UnknownBlockSamplingRequest { process_fn } => {
task_spawner.spawn_blocking(process_fn)
}
Work::DelayedImportBlock {
beacon_block_slot: _,
beacon_block_root: _,
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ lazy_static::lazy_static! {
"beacon_processor_reprocessing_queue_matched_optimistic_updates",
"Number of queued light client optimistic updates where as matching block has been imported."
);
// TODO: This should be labeled instead of N single metrics
pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS: Result<IntCounter> = try_create_int_counter(
"beacon_processor_reprocessing_queue_matches_sampling_requests",
"Number of queued sampling requests where as matching block has been imported."
);

/// Errors and Debugging Stats
pub static ref BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE: Result<IntCounterVec> =
Expand Down
Loading
Loading