From 8688065fcba7f5c918cb0558272756aa22223dc4 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 6 Aug 2024 13:28:03 +0200 Subject: [PATCH] Add PeerDAS RPC import boilerplate --- beacon_node/beacon_chain/src/beacon_chain.rs | 100 +++++++++++++----- .../src/data_availability_checker.rs | 42 +++++--- .../overflow_lru_cache.rs | 29 ++--- .../src/data_column_verification.rs | 98 +++++++++++++---- beacon_node/beacon_processor/src/lib.rs | 17 ++- .../gossip_methods.rs | 2 +- .../src/network_beacon_processor/mod.rs | 19 ++++ .../network_beacon_processor/sync_methods.rs | 55 ++++++++++ 8 files changed, 282 insertions(+), 80 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3bf75284779..5eb86d8699b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,7 +23,9 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use crate::data_column_verification::{ + DataColumnsSameBlock, GossipDataColumnError, GossipVerifiedDataColumn, +}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -2968,23 +2970,13 @@ impl BeaconChain { /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. - pub async fn process_gossip_data_columns( + pub async fn process_gossip_data_column( self: &Arc, - data_columns: Vec>, + data_column: GossipVerifiedDataColumn, ) -> Result> { - let Ok((slot, block_root)) = data_columns - .iter() - .map(|c| (c.slot(), c.block_root())) - .unique() - .exactly_one() - else { - return Err(BlockError::InternalError( - "Columns should be from the same block".to_string(), - )); - }; - // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its samples again. + let block_root = data_column.block_root(); if self .canonical_head .fork_choice_read_lock() @@ -2994,7 +2986,7 @@ impl BeaconChain { } let r = self - .check_gossip_data_columns_availability_and_import(slot, block_root, data_columns) + .check_gossip_data_columns_availability_and_import(data_column) .await; self.remove_notified_custody_columns(&block_root, r) } @@ -3033,6 +3025,31 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the columns in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_rpc_custody_columns( + self: &Arc, + custody_columns: DataColumnsSameBlock, + ) -> Result> { + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its columns again. + let block_root = custody_columns.block_root(); + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown(block_root)); + } + + // TODO(das): custody column SSE event + + let r = self + .check_rpc_custody_columns_availability_and_import(custody_columns) + .await; + self.remove_notified(&block_root, r) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3311,21 +3328,16 @@ impl BeaconChain { /// if so, otherwise caches the data column in the data availability checker. async fn check_gossip_data_columns_availability_and_import( self: &Arc, - slot: Slot, - block_root: Hash256, - data_columns: Vec>, + data_column: GossipVerifiedDataColumn, ) -> Result> { if let Some(slasher) = self.slasher.as_ref() { - for data_colum in &data_columns { - slasher.accept_block_header(data_colum.signed_block_header()); - } + slasher.accept_block_header(data_column.signed_block_header()); } - let availability = self.data_availability_checker.put_gossip_data_columns( - slot, - block_root, - data_columns, - )?; + let slot = data_column.slot(); + let availability = self + .data_availability_checker + .put_gossip_data_column(data_column)?; self.process_availability(slot, availability).await } @@ -3369,6 +3381,42 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided columns can make any cached blocks available, and imports immediately + /// if so, otherwise caches the columns in the data availability checker. + async fn check_rpc_custody_columns_availability_and_import( + self: &Arc, + custody_columns: DataColumnsSameBlock, + ) -> Result> { + // Need to scope this to ensure the lock is dropped before calling `process_availability` + // Even an explicit drop is not enough to convince the borrow checker. + { + let mut slashable_cache = self.observed_slashable.write(); + let header = custody_columns.signed_block_header(); + let block_root = custody_columns.block_root(); + if verify_header_signature::>(self, header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header.clone()); + } + } + } + + // This slot value is purely informative for the consumers of + // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. + let slot = custody_columns.slot(); + let availability = self + .data_availability_checker + .put_rpc_custody_columns(custody_columns)?; + + self.process_availability(slot, availability).await + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b4336a054e2..ece483ce8fc 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -16,14 +16,15 @@ use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, - Slot, }; mod error; mod overflow_lru_cache; mod state_lru_cache; -use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; +use crate::data_column_verification::{ + DataColumnsSameBlock, GossipVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -181,6 +182,25 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, epoch, verified_blobs) } + /// Put a list of custody columns received via RPC into the availability cache. This performs KZG + /// verification on the blobs in the list. + #[allow(clippy::type_complexity)] + pub fn put_rpc_custody_columns( + &self, + custody_columns: DataColumnsSameBlock, + ) -> Result, AvailabilityCheckError> { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + // TODO(das): report which column is invalid for proper peer scoring + // TODO(das): batch KZG verification here + let verified_custody_columns = custody_columns.verify(kzg)?; + + self.availability_cache + .put_kzg_verified_data_columns(verified_custody_columns) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -197,20 +217,18 @@ impl DataAvailabilityChecker { ) } - pub fn put_gossip_data_columns( + pub fn put_gossip_data_column( &self, - slot: Slot, - block_root: Hash256, - gossip_data_columns: Vec>, + gossip_data_column: GossipVerifiedDataColumn, ) -> Result, AvailabilityCheckError> { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let custody_columns = gossip_data_columns - .into_iter() - .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) - .collect::>(); + let custody_column = gossip_data_column.into_inner(); + + // Will never error as there's exactly one column + let custody_columns_same_block = KzgVerifiedDataColumnsSameBlock::new(vec![custody_column]) + .expect("exactly one column is always in same block"); self.availability_cache - .put_kzg_verified_data_columns(block_root, epoch, custody_columns) + .put_kzg_verified_data_columns(custody_columns_same_block) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6c9964bdf86..d681159eaff 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -5,7 +5,7 @@ use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; -use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::data_column_verification::{KzgVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock}; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; @@ -24,7 +24,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, - pub verified_data_columns: Vec>, + pub verified_data_columns: Vec>, pub executed_block: Option>, } @@ -50,7 +50,7 @@ impl PendingComponents { pub fn get_cached_data_column( &self, data_column_index: u64, - ) -> Option<&KzgVerifiedCustodyDataColumn> { + ) -> Option<&KzgVerifiedDataColumn> { self.verified_data_columns .iter() .find(|d| d.index() == data_column_index) @@ -157,13 +157,10 @@ impl PendingComponents { } /// Merges a given set of data columns into the cache. - fn merge_data_columns>>( - &mut self, - kzg_verified_data_columns: I, - ) { + fn merge_data_columns(&mut self, kzg_verified_data_columns: &[KzgVerifiedDataColumn]) { for data_column in kzg_verified_data_columns { if !self.data_column_exists(data_column.index()) { - self.verified_data_columns.push(data_column); + self.verified_data_columns.push(data_column.clone()); } } } @@ -261,7 +258,7 @@ impl PendingComponents { BlockImportRequirement::CustodyColumns(_) => { let verified_data_columns = verified_data_columns .into_iter() - .map(|d| d.into_inner()) + .map(|d| d.to_data_column()) .collect(); (None, Some(verified_data_columns)) } @@ -430,17 +427,13 @@ impl DataAvailabilityCheckerInner { } } - // TODO(das): rpc code paths to be implemented. - #[allow(dead_code)] - pub fn put_kzg_verified_data_columns< - I: IntoIterator>, - >( + pub fn put_kzg_verified_data_columns( &self, - block_root: Hash256, - epoch: Epoch, - kzg_verified_data_columns: I, + data_columns: KzgVerifiedDataColumnsSameBlock, ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); + let block_root = data_columns.block_root(); + let epoch = data_columns.slot().epoch(T::EthSpec::slots_per_epoch()); // Grab existing entry or create a new entry. let mut pending_components = write_lock @@ -449,7 +442,7 @@ impl DataAvailabilityCheckerInner { .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the data columns. - pending_components.merge_data_columns(kzg_verified_data_columns); + pending_components.merge_data_columns(data_columns.columns()); let block_import_requirement = self.block_import_requirement(epoch)?; if pending_components.is_available(&block_import_requirement) { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index da639e3695e..7d85007d9f1 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -14,10 +14,11 @@ use slot_clock::SlotClock; use ssz_derive::{Decode, Encode}; use std::iter; use std::sync::Arc; +use tree_hash::TreeHash; use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; use types::{ - BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlockHeader, Slot, + BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarList, DataColumnSubnetId, + EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip data column. @@ -177,7 +178,7 @@ impl GossipVerifiedDataColumn { pub fn id(&self) -> DataColumnIdentifier { DataColumnIdentifier { block_root: self.block_root, - index: self.data_column.data_column_index(), + index: self.data_column.index(), } } @@ -221,34 +222,91 @@ impl KzgVerifiedDataColumn { self.data.clone() } - pub fn data_column_index(&self) -> u64 { + pub fn index(&self) -> ColumnIndex { self.data.index } } -/// Data column that we must custody and has completed kzg verification -#[derive(Debug, Derivative, Clone, Encode, Decode)] -#[derivative(PartialEq, Eq)] -#[ssz(struct_behaviour = "transparent")] -pub struct KzgVerifiedCustodyDataColumn { - data: Arc>, +/// Collection of data columns for the same block root +pub struct DataColumnsSameBlock { + block_root: Hash256, + signed_block_header: SignedBeaconBlockHeader, + columns: DataColumnSidecarList, } -impl KzgVerifiedCustodyDataColumn { - /// Mark a column as custody column. Caller must ensure that our current custody requirements - /// include this column - pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn) -> Self { - Self { - data: kzg_verified.to_data_column(), +impl DataColumnsSameBlock { + pub fn new(columns: DataColumnSidecarList) -> Result { + let first_column = columns.first().ok_or("empty columns")?; + let signed_block_header = first_column.signed_block_header.clone(); + for column in columns.iter().skip(1) { + if column.signed_block_header != signed_block_header { + return Err("no same block"); + } } + Ok(Self { + block_root: signed_block_header.message.tree_hash_root(), + signed_block_header, + columns, + }) } - pub fn index(&self) -> ColumnIndex { - self.data.index + pub fn verify(self, kzg: &Kzg) -> Result, KzgError> { + Ok(KzgVerifiedDataColumnsSameBlock { + block_root: self.block_root, + signed_block_header: self.signed_block_header, + columns: self + .columns + .into_iter() + .map(|column| KzgVerifiedDataColumn::new(column, kzg)) + .collect::, _>>()?, + }) } - pub fn into_inner(self) -> Arc> { - self.data + pub fn block_root(&self) -> Hash256 { + self.block_root + } + pub fn slot(&self) -> Slot { + self.signed_block_header.message.slot + } + pub fn signed_block_header(&self) -> &SignedBeaconBlockHeader { + &self.signed_block_header + } + pub fn columns(&self) -> &DataColumnSidecarList { + &self.columns + } +} + +/// Collection of KZG verified data columns for the same block root +pub struct KzgVerifiedDataColumnsSameBlock { + block_root: Hash256, + signed_block_header: SignedBeaconBlockHeader, + columns: Vec>, +} + +impl KzgVerifiedDataColumnsSameBlock { + pub fn new(columns: Vec>) -> Result { + let first_column = columns.first().ok_or("empty columns")?; + let signed_block_header = first_column.as_data_column().signed_block_header.clone(); + for column in columns.iter().skip(1) { + if column.as_data_column().signed_block_header != signed_block_header { + return Err("no same block"); + } + } + Ok(Self { + block_root: signed_block_header.message.tree_hash_root(), + signed_block_header, + columns, + }) + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + pub fn slot(&self) -> Slot { + self.signed_block_header.message.slot + } + pub fn columns(&self) -> &[KzgVerifiedDataColumn] { + &self.columns } } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 68c33e99baf..6ce3b64acfe 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -108,6 +108,7 @@ pub struct BeaconProcessorQueueLengths { unknown_light_client_update_queue: usize, rpc_block_queue: usize, rpc_blob_queue: usize, + rpc_custody_column_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, gossip_block_queue: usize, @@ -163,6 +164,7 @@ impl BeaconProcessorQueueLengths { unknown_light_client_update_queue: 128, rpc_block_queue: 1024, rpc_blob_queue: 1024, + rpc_custody_column_queue: 1024, chain_segment_queue: 64, backfill_chain_segment: 64, gossip_block_queue: 1024, @@ -228,6 +230,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -606,6 +609,7 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcCustodyColumn(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -653,6 +657,7 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -815,6 +820,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); @@ -970,6 +976,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = rpc_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = rpc_custody_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { @@ -1262,6 +1270,9 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcCustodyColumn { .. } => { + rpc_custody_column_queue.push(work, work_id, &self.log) + } Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } @@ -1497,9 +1508,9 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { - task_spawner.spawn_async(process_fn) - } + Work::RpcBlock { process_fn } + | Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 4c5c34bfd83..c8007f60dfa 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -988,7 +988,7 @@ impl NetworkBeaconProcessor { match self .chain - .process_gossip_data_columns(vec![verified_data_column]) + .process_gossip_data_column(verified_data_column) .await { Ok(availability) => { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9fb14fdcb8c..7ca68be82ba 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,7 @@ use crate::sync::manager::BlockProcessType; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::DataColumnsSameBlock; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ @@ -476,6 +477,24 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports + /// the result back to sync. + pub fn send_rpc_custody_columns( + self: &Arc, + custody_columns: DataColumnsSameBlock, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcCustodyColumn(Box::pin(async move { + s.process_rpc_custody_columns(custody_columns, seen_timestamp, process_type) + .await; + })), + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 68bd6745144..59120af324e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,6 +8,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::data_column_verification::DataColumnsSameBlock; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -307,6 +308,60 @@ impl NetworkBeaconProcessor { }); } + pub async fn process_rpc_custody_columns( + self: Arc>, + custody_columns: DataColumnsSameBlock, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let block_root = custody_columns.block_root(); + let result = self + .chain + .process_rpc_custody_columns(custody_columns) + .await; + + match &result { + Ok(availability) => match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + } + }, + Err(BlockError::BlockIsAlreadyKnown(_)) => { + debug!( + self.log, + "Custody columns have already been imported"; + "block_hash" => %block_root, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc custody columns"; + "error" => ?e, + "block_hash" => %block_root, + ); + } + } + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment(