Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify database anchor #6397

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,9 +905,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {

let block_root = get_block_header_root(block_header);

// Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any.
check_block_against_anchor_slot(block.message(), chain)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks were unnecessary because the anchor slot is always older than finalization, so any block older than the anchor is also older than finalization and will be caught by check_block_against_finalized_slot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if checkpoint sync to a non-finalized checkpoint though? In that case, the checkpoint slot could be ahead of finalized slot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we force-initialize the fork-choice as if that checkpoint was finalized.


// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), block_root, chain)?;

Expand Down Expand Up @@ -1138,9 +1135,6 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
.fork_name(&chain.spec)
.map_err(BlockError::InconsistentFork)?;

// Check the anchor slot before loading the parent, to avoid spurious lookups.
check_block_against_anchor_slot(block.message(), chain)?;

let (mut parent, block) = load_parent(block, chain)?;

let state = cheap_state_advance_to_obtain_committees::<_, BlockError>(
Expand Down Expand Up @@ -1775,19 +1769,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any).
fn check_block_against_anchor_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError> {
if let Some(anchor_slot) = chain.store.get_anchor_slot() {
if block.slot() <= anchor_slot {
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}

/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error
Expand Down
9 changes: 2 additions & 7 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum HistoricalBlockError {
InvalidSignature,
/// Transitory error, caller should retry with the same blocks.
ValidatorPubkeyCacheTimeout,
/// No historical sync needed.
NoAnchorInfo,
/// Logic error: should never occur.
IndexOutOfBounds,
}
Expand Down Expand Up @@ -62,10 +60,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
mut blocks: Vec<AvailableBlock<T::EthSpec>>,
) -> Result<usize, Error> {
let anchor_info = self
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let anchor_info = self.store.get_anchor_info();
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

Expand Down Expand Up @@ -263,7 +258,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
anchor_and_blob_batch.push(
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
.compare_and_set_anchor_info(anchor_info, new_anchor)?,
);
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Schedule another reconstruction batch if required and we have access to the
// channel for requeueing.
if let Some(tx) = opt_tx {
if db.get_anchor_info().is_some() {
if !db.get_anchor_info().all_historic_states_stored() {
if let Err(e) = tx.send(Notification::Reconstruction) {
error!(
log,
Expand Down
37 changes: 17 additions & 20 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRootsChunked,
get_key_for_col,
metadata::{SchemaVersion, STATE_UPPER_LIMIT_NO_RETAIN},
metadata::{
SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN,
},
partial_beacon_state::PartialBeaconState,
AnchorInfo, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
Expand Down Expand Up @@ -43,7 +45,14 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
) -> Result<(), Error> {
info!(log, "Upgrading from v21 to v22");

let old_anchor = db.get_anchor_info();
let mut old_anchor = db.get_anchor_info();

// If the anchor was uninitialized in the old schema (`None`), this represents a full archive
// node.
if old_anchor == ANCHOR_UNINITIALIZED {
old_anchor = ANCHOR_FOR_ARCHIVE_NODE;
}

let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;

Expand All @@ -70,9 +79,7 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(

// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = old_anchor
.as_ref()
.map_or(Slot::new(0), |a| a.oldest_block_slot);
let oldest_block_slot = old_anchor.oldest_block_slot;
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
Expand All @@ -90,22 +97,12 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = if let Some(old_anchor) = &old_anchor {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::ZERO,
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
let new_anchor = AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, Some(new_anchor))?];
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;

// Finally, clean up the old-format data from the freezer database.
Expand Down
157 changes: 10 additions & 147 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ async fn light_client_bootstrap_test() {
return;
};

let checkpoint_slot = Slot::new(E::slots_per_epoch() * 6);
let db_path = tempdir().unwrap();
let log = test_logger();

let seconds_per_slot = spec.seconds_per_slot;
let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::<E>());
let store = get_store_generic(&db_path, StoreConfig::default(), spec.clone());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let num_initial_slots = E::slots_per_epoch() * 7;
Expand All @@ -132,79 +128,15 @@ async fn light_client_bootstrap_test() {
)
.await;

let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
let wss_state = store
.get_state(&wss_state_root, Some(checkpoint_slot))
.unwrap()
.unwrap();

let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());

let mock =
mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone());

// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());

let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);

let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.task_executor(harness.chain.task_executor.clone())
.logger(log.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.execution_layer(Some(mock.el))
.kzg(kzg)
.build()
.expect("should build");

let current_state = harness.get_current_state();

if ForkName::Electra == current_state.fork_name_unchecked() {
// TODO(electra) fix beacon state `compute_merkle_proof`
return;
}

let finalized_checkpoint = beacon_chain
let finalized_checkpoint = harness
.chain
.canonical_head
.cached_head()
.finalized_checkpoint();
Expand Down Expand Up @@ -239,11 +171,7 @@ async fn light_client_updates_test() {
};

let num_final_blocks = E::slots_per_epoch() * 2;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9);
let db_path = tempdir().unwrap();
let log = test_logger();

let seconds_per_slot = spec.seconds_per_slot;
let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::<E>());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
Expand All @@ -260,33 +188,6 @@ async fn light_client_updates_test() {
)
.await;

let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
let wss_state = store
.get_state(&wss_state_root, Some(checkpoint_slot))
.unwrap()
.unwrap();

let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());

let mock =
mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone());

harness.advance_slot();
harness
.extend_chain_with_light_client_data(
Expand All @@ -296,46 +197,6 @@ async fn light_client_updates_test() {
)
.await;

// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());

let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);

let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.task_executor(harness.chain.task_executor.clone())
.logger(log.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.execution_layer(Some(mock.el))
.kzg(kzg)
.build()
.expect("should build");

let beacon_chain = Arc::new(beacon_chain);

let current_state = harness.get_current_state();

if ForkName::Electra == current_state.fork_name_unchecked() {
Expand All @@ -351,7 +212,8 @@ async fn light_client_updates_test() {

// fetch a range of light client updates. right now there should only be one light client update
// in the db.
let lc_updates = beacon_chain
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();

Expand All @@ -371,7 +233,8 @@ async fn light_client_updates_test() {
.await;

// we should now have two light client updates in the db
let lc_updates = beacon_chain
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();

Expand Down Expand Up @@ -2646,11 +2509,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
}

// Anchor slot is still set to the slot of the checkpoint block.
assert_eq!(store.get_anchor_slot(), Some(wss_block.slot()));
assert_eq!(store.get_anchor_info().anchor_slot, wss_block.slot());

// Reconstruct states.
store.clone().reconstruct_historic_states(None).unwrap();
assert_eq!(store.get_anchor_slot(), None);
assert_eq!(store.get_anchor_info().anchor_slot, 0);
}

/// Test that blocks and attestations that refer to states around an unaligned split state are
Expand Down Expand Up @@ -3489,7 +3352,7 @@ async fn prune_historic_states() {
.unwrap();

// Check that anchor info is updated.
let anchor_info = store.get_anchor_info().unwrap();
let anchor_info = store.get_anchor_info();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);

Expand Down
Loading
Loading