Skip to content

Commit

Permalink
revert refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Aug 13, 2024
1 parent 8688065 commit 77e0cb6
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 141 deletions.
94 changes: 61 additions & 33 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{
DataColumnsSameBlock, GossipDataColumnError, GossipVerifiedDataColumn,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -2970,13 +2968,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
let block_root = data_column.block_root();
if self
.canonical_head
.fork_choice_read_lock()
Expand All @@ -2986,7 +2994,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let r = self
.check_gossip_data_columns_availability_and_import(data_column)
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand Down Expand Up @@ -3029,11 +3037,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// imported or errors.
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnsSameBlock<T::EthSpec>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
let block_root = custody_columns.block_root();
if self
.canonical_head
.fork_choice_read_lock()
Expand All @@ -3045,7 +3063,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(das): custody column SSE event

let r = self
.check_rpc_custody_columns_availability_and_import(custody_columns)
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -3328,16 +3346,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let slot = data_column.slot();
let availability = self
.data_availability_checker
.put_gossip_data_column(data_column)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability).await
}
Expand Down Expand Up @@ -3385,34 +3408,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// if so, otherwise caches the columns in the data availability checker.
async fn check_rpc_custody_columns_availability_and_import(
self: &Arc<Self>,
custody_columns: DataColumnsSameBlock<T::EthSpec>,
slot: Slot,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
let header = custody_columns.signed_block_header();
let block_root = custody_columns.block_root();
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
}

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let slot = custody_columns.slot();
let availability = self
.data_availability_checker
.put_rpc_custody_columns(custody_columns)?;
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;

self.process_availability(slot, availability).await
}
Expand Down
42 changes: 29 additions & 13 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};

mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::{
DataColumnsSameBlock, GossipVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock,
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;
Expand Down Expand Up @@ -187,18 +188,31 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
#[allow(clippy::type_complexity)]
pub fn put_rpc_custody_columns(
&self,
custody_columns: DataColumnsSameBlock<T::EthSpec>,
block_root: Hash256,
epoch: Epoch,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};

// TODO(das): report which column is invalid for proper peer scoring
// TODO(das): batch KZG verification here
let verified_custody_columns = custody_columns.verify(kzg)?;
let verified_custody_columns = custody_columns
.iter()
.map(|column| {
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::new(column.clone(), kzg)
.map_err(AvailabilityCheckError::Kzg)?,
))
})
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;

self.availability_cache
.put_kzg_verified_data_columns(verified_custody_columns)
self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
verified_custody_columns,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -217,18 +231,20 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
)
}

pub fn put_gossip_data_column(
pub fn put_gossip_data_columns(
&self,
gossip_data_column: GossipVerifiedDataColumn<T>,
slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let custody_column = gossip_data_column.into_inner();

// Will never error as there's exactly one column
let custody_columns_same_block = KzgVerifiedDataColumnsSameBlock::new(vec![custody_column])
.expect("exactly one column is always in same block");
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(custody_columns_same_block)
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::{KzgVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
Expand All @@ -24,7 +24,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
pub verified_data_columns: Vec<KzgVerifiedDataColumn<E>>,
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

Expand All @@ -50,7 +50,7 @@ impl<E: EthSpec> PendingComponents<E> {
pub fn get_cached_data_column(
&self,
data_column_index: u64,
) -> Option<&KzgVerifiedDataColumn<E>> {
) -> Option<&KzgVerifiedCustodyDataColumn<E>> {
self.verified_data_columns
.iter()
.find(|d| d.index() == data_column_index)
Expand Down Expand Up @@ -157,7 +157,10 @@ impl<E: EthSpec> PendingComponents<E> {
}

/// Merges a given set of data columns into the cache.
fn merge_data_columns(&mut self, kzg_verified_data_columns: &[KzgVerifiedDataColumn<E>]) {
fn merge_data_columns<I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<E>>>(
&mut self,
kzg_verified_data_columns: I,
) {
for data_column in kzg_verified_data_columns {
if !self.data_column_exists(data_column.index()) {
self.verified_data_columns.push(data_column.clone());
Expand Down Expand Up @@ -258,7 +261,7 @@ impl<E: EthSpec> PendingComponents<E> {
BlockImportRequirement::CustodyColumns(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.to_data_column())
.map(|d| d.into_inner())
.collect();
(None, Some(verified_data_columns))
}
Expand Down Expand Up @@ -427,13 +430,15 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

pub fn put_kzg_verified_data_columns(
pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
>(
&self,
data_columns: KzgVerifiedDataColumnsSameBlock<T::EthSpec>,
block_root: Hash256,
epoch: Epoch,
kzg_verified_data_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
let block_root = data_columns.block_root();
let epoch = data_columns.slot().epoch(T::EthSpec::slots_per_epoch());

// Grab existing entry or create a new entry.
let mut pending_components = write_lock
Expand All @@ -442,7 +447,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the data columns.
pending_components.merge_data_columns(data_columns.columns());
pending_components.merge_data_columns(kzg_verified_data_columns);

let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
Expand Down
Loading

0 comments on commit 77e0cb6

Please sign in to comment.