Skip to content

Commit

Permalink
Keep PendingComponents in da_checker during import_block
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 24, 2024
1 parent 7136412 commit 7c125b8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 8 deletions.
14 changes: 14 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3328,6 +3328,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"payload_verification_handle",
)
.await??;

// Remove block components from da_checker AFTER completing block import. Then we can assert
// the following invariant:
// > A valid unfinalized block is either in fork-choice or da_checker.
//
// If we remove the block when it becomes available, there's some time window during
// `import_block` where the block is nowhere. Consumers of the da_checker can handle the
// extend time a block may exist in the da_checker.
//
// If `import_block` errors (only errors with internal errors), the pending components will
// be pruned on data_availability_checker maintenance as finality advances.
self.data_availability_checker
.remove_pending_components(block_root);

Ok(AvailabilityProcessingStatus::Imported(block_root))
}

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_pending_executed_block(executed_block)
}

pub fn remove_pending_components(&self, block_root: Hash256) {
self.availability_cache
.remove_pending_components(block_root)
}

/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may
/// include the fully available block.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,18 +475,18 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(())
}

/// Removes and returns the pending_components corresponding to
/// the `block_root` or `None` if it does not exist
pub fn pop_pending_components(
/// Returns the pending_components corresponding to the `block_root` or `None` if it does not
/// exist
pub fn get_pending_components(
&mut self,
block_root: Hash256,
store: &OverflowStore<T>,
) -> Result<Option<PendingComponents<T::EthSpec>>, AvailabilityCheckError> {
match self.in_memory.pop_entry(&block_root) {
Some((_, pending_components)) => Ok(Some(pending_components)),
match self.in_memory.get(&block_root) {
Some(pending_components) => Ok(Some(pending_components.clone())),
None => {
// not in memory, is it in the store?
if self.store_keys.remove(&block_root) {
if self.store_keys.contains(&block_root) {
// We don't need to remove the data from the store as we have removed it from
// `store_keys` so we won't go looking for it on disk. The maintenance thread
// will remove it from disk the next time it runs.
Expand All @@ -498,6 +498,21 @@ impl<T: BeaconChainTypes> Critical<T> {
}
}

/// Removes and returns the pending_components corresponding to
/// the `block_root` or `None` if it does not exist
pub fn remove_pending_components(&mut self, block_root: Hash256) {
match self.in_memory.pop_entry(&block_root) {
Some { .. } => {}
None => {
// not in memory, is it in the store?
// We don't need to remove the data from the store as we have removed it from
// `store_keys` so we won't go looking for it on disk. The maintenance thread
// will remove it from disk the next time it runs.
self.store_keys.remove(&block_root);
}
}
}

/// Returns the number of pending component entries in memory.
pub fn num_blocks(&self) -> usize {
self.in_memory.len()
Expand Down Expand Up @@ -600,13 +615,18 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.get_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);

if pending_components.is_available() {
write_lock.put_pending_components(
block_root,
pending_components.clone(),
&self.overflow_store,
)?;
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -638,14 +658,19 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {

// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_pending_components(block_root, &self.overflow_store)?
.get_pending_components(block_root, &self.overflow_store)?
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the block.
pending_components.merge_block(diet_executed_block);

// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
write_lock.put_pending_components(
block_root,
pending_components.clone(),
&self.overflow_store,
)?;
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand All @@ -661,6 +686,10 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}
}

pub fn remove_pending_components(&self, block_root: Hash256) {
self.critical.write().remove_pending_components(block_root);
}

/// write all in memory objects to disk
pub fn write_all_to_disk(&self) -> Result<(), AvailabilityCheckError> {
let maintenance_lock = self.maintenance_lock.lock();
Expand Down

0 comments on commit 7c125b8

Please sign in to comment.