Skip to content

Commit

Permalink
Reconstruct data columns without blocking processing and import (#5990)
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit 7d2d826
Author: Jimmy Chen <[email protected]>
Date:   Mon Jul 1 13:44:45 2024 +1000

    Send import results to sync after reconstruction. Add more logging and metrics.

commit 4b30ebe
Merge: f93e2b5 7206909
Author: Jimmy Chen <[email protected]>
Date:   Fri Jun 28 17:23:22 2024 +1000

    Merge branch 'das' into fork/reconstruct-without-blocking-import

commit f93e2b5
Author: Jimmy Chen <[email protected]>
Date:   Fri Jun 28 14:42:04 2024 +1000

    Code cleanup: add type aliases and update comments.

commit 6ac055d
Author: Jimmy Chen <[email protected]>
Date:   Fri Jun 28 14:26:40 2024 +1000

    Revert reconstruction behaviour to always go ahead rather than allowing one at a time. Address other review comments.

commit 1e3964e
Author: Jimmy Chen <[email protected]>
Date:   Tue Jun 25 00:02:19 2024 +1000

    Reconstruct columns without blocking processing and import.
  • Loading branch information
jimmygchen committed Jul 1, 2024
1 parent 7206909 commit b06c23e
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 227 deletions.
103 changes: 59 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
Expand Down Expand Up @@ -3057,13 +3056,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
Expand Down Expand Up @@ -3131,13 +3124,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
if self
Expand All @@ -3162,6 +3149,52 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified_custody_columns(&block_root, r)
}

pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarVec<T::EthSpec>,
)>,
BlockError<T::EthSpec>,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
}

let Some((availability, data_column_to_publish)) = self
.data_availability_checker
.reconstruct_data_columns(block_root)?
else {
return Ok(None);
};

let Ok(slot) = data_column_to_publish
.iter()
.map(|c| c.slot())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let r = self.process_availability(slot, availability).await;
self.remove_notified_custody_columns(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_column_to_publish))
})
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand All @@ -3179,15 +3212,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
fn remove_notified_custody_columns(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
Expand Down Expand Up @@ -3436,13 +3467,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
Expand All @@ -3455,13 +3480,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
));
};

let (availability, data_columns_to_publish) = self
let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3509,13 +3532,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3539,13 +3556,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
let (availability, data_columns_to_publish) = self
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
39 changes: 15 additions & 24 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::data_column_verification::{
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumnList,
};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
Expand All @@ -15,8 +15,8 @@ use std::sync::Arc;
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
use types::data_column_sidecar::{self};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<E: EthSpec> RpcBlock<E> {
}
}

pub fn custody_columns(&self) -> Option<&CustodyDataColumnList<E>> {
pub fn custody_columns(&self) -> Option<&Vec<CustodyDataColumn<E>>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, _) => None,
Expand All @@ -96,7 +96,7 @@ enum RpcBlockInner<E: EthSpec> {
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all
/// requested data columns, all block roots matching for this block.
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, CustodyDataColumnList<E>),
BlockAndCustodyColumns(Arc<SignedBeaconBlock<E>>, Vec<CustodyDataColumn<E>>),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -158,7 +158,6 @@ impl<E: EthSpec> RpcBlock<E> {
block_root: Option<Hash256>,
block: Arc<SignedBeaconBlock<E>>,
custody_columns: Vec<CustodyDataColumn<E>>,
spec: &ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

Expand All @@ -168,10 +167,7 @@ impl<E: EthSpec> RpcBlock<E> {
}
// Treat empty data column lists as if they are missing.
let inner = if !custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(custody_columns, spec.number_of_columns)?,
)
RpcBlockInner::BlockAndCustodyColumns(block, custody_columns)
} else {
RpcBlockInner::Block(block)
};
Expand Down Expand Up @@ -205,7 +201,7 @@ impl<E: EthSpec> RpcBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<CustodyDataColumnList<E>>,
Option<Vec<CustodyDataColumn<E>>>,
) {
let block_root = self.block_root();
match self.block {
Expand Down Expand Up @@ -596,7 +592,6 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let number_of_columns = self.spec.number_of_columns;
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
Expand All @@ -605,18 +600,14 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(
data_columns
.into_iter()
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
.map(|d| CustodyDataColumn::from_asserted_custody(d))
.collect(),
number_of_columns,
)
.expect("data column list is within bounds"),
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
data_columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
.collect(),
),
};
RpcBlock {
Expand Down
43 changes: 21 additions & 22 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg
use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
};
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
use crate::data_availability_checker::overflow_lru_cache::{
AvailabilityAndReconstructedColumns, OverflowLRUCache,
};
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use slog::{debug, error, o, Logger};
Expand All @@ -23,6 +25,7 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_availability_checker::error::Error;
use crate::data_column_verification::{
verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn,
KzgVerifiedCustodyDataColumn,
Expand All @@ -31,8 +34,6 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh
use types::data_column_sidecar::DataColumnIdentifier;
use types::non_zero_usize::new_non_zero_usize;

pub use self::overflow_lru_cache::DataColumnsToPublish;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
Expand Down Expand Up @@ -159,6 +160,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_data_column(data_column_id)
}

pub fn reconstruct_data_columns(
&self,
block_root: Hash256,
) -> Result<Option<AvailabilityAndReconstructedColumns<T::EthSpec>>, Error> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
self.availability_cache
.reconstruct_data_columns(kzg, block_root)
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -190,8 +202,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
Expand All @@ -203,11 +214,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg))
.collect::<Result<Vec<_>, _>>()?;

self.availability_cache.put_kzg_verified_data_columns(
kzg,
block_root,
verified_custody_columns,
)
self.availability_cache
.put_kzg_verified_data_columns(block_root, verified_custody_columns)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -232,11 +240,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_gossip_data_columns(
&self,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let block_root = gossip_data_columns
.first()
.ok_or(AvailabilityCheckError::MissingCustodyColumns)?
Expand All @@ -248,7 +252,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(kzg, block_root, custody_columns)
.put_kzg_verified_data_columns(block_root, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down Expand Up @@ -314,12 +318,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block,
blobs: None,
blobs_available_timestamp: None,
data_columns: Some(
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
),
data_columns: Some(data_column_list.iter().map(|d| d.clone_arc()).collect()),
spec: self.spec.clone(),
}))
} else {
Expand Down
Loading

0 comments on commit b06c23e

Please sign in to comment.