Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify database anchor #6397

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,9 +905,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {

let block_root = get_block_header_root(block_header);

// Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any.
check_block_against_anchor_slot(block.message(), chain)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks were unnecessary because the anchor slot is always older than finalization, so any block older than the anchor is also older than finalization and will be caught by check_block_against_finalized_slot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if checkpoint sync to a non-finalized checkpoint though? In that case, the checkpoint slot could be ahead of finalized slot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we force-initialize the fork-choice as if that checkpoint was finalized.


// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), block_root, chain)?;

Expand Down Expand Up @@ -1138,9 +1135,6 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
.fork_name(&chain.spec)
.map_err(BlockError::InconsistentFork)?;

// Check the anchor slot before loading the parent, to avoid spurious lookups.
check_block_against_anchor_slot(block.message(), chain)?;

let (mut parent, block) = load_parent(block, chain)?;

let state = cheap_state_advance_to_obtain_committees::<_, BlockError>(
Expand Down Expand Up @@ -1775,19 +1769,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any).
fn check_block_against_anchor_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError> {
if let Some(anchor_slot) = chain.store.get_anchor_slot() {
if block.slot() <= anchor_slot {
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}

/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error
Expand Down
9 changes: 2 additions & 7 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum HistoricalBlockError {
InvalidSignature,
/// Transitory error, caller should retry with the same blocks.
ValidatorPubkeyCacheTimeout,
/// No historical sync needed.
NoAnchorInfo,
/// Logic error: should never occur.
IndexOutOfBounds,
}
Expand Down Expand Up @@ -62,10 +60,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
mut blocks: Vec<AvailableBlock<T::EthSpec>>,
) -> Result<usize, Error> {
let anchor_info = self
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let anchor_info = self.store.get_anchor_info();
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

Expand Down Expand Up @@ -263,7 +258,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
anchor_and_blob_batch.push(
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
.compare_and_set_anchor_info(anchor_info, new_anchor)?,
);
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Schedule another reconstruction batch if required and we have access to the
// channel for requeueing.
if let Some(tx) = opt_tx {
if db.get_anchor_info().is_some() {
if !db.get_anchor_info().all_historic_states_stored() {
if let Err(e) = tx.send(Notification::Reconstruction) {
error!(
log,
Expand Down
24 changes: 6 additions & 18 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(

// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = old_anchor
.as_ref()
.map_or(Slot::new(0), |a| a.oldest_block_slot);
let oldest_block_slot = old_anchor.oldest_block_slot;
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
Expand All @@ -90,22 +88,12 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = if let Some(old_anchor) = &old_anchor {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::ZERO,
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
let new_anchor = AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, Some(new_anchor))?];
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;

// Finally, clean up the old-format data from the freezer database.
Expand Down
36 changes: 14 additions & 22 deletions beacon_node/client/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let mut current_sync_state = network.sync_state();

// Store info if we are required to do a backfill sync.
let original_anchor_slot = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot);
let original_oldest_block_slot = beacon_chain.store.get_anchor_info().oldest_block_slot;

let interval_future = async move {
// Perform pre-genesis logging.
Expand Down Expand Up @@ -141,22 +138,17 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
match current_sync_state {
SyncState::BackFillSyncing { .. } => {
// Observe backfilling sync info.
if let Some(oldest_slot) = original_anchor_slot {
if let Some(current_anchor_slot) = beacon_chain
.store
.get_anchor_info()
.map(|ai| ai.oldest_block_slot)
{
sync_distance = current_anchor_slot
.saturating_sub(beacon_chain.genesis_backfill_slot);
speedo
// For backfill sync use a fake slot which is the distance we've progressed from the starting `oldest_block_slot`.
.observe(
oldest_slot.saturating_sub(current_anchor_slot),
Instant::now(),
);
}
}
let current_oldest_block_slot =
beacon_chain.store.get_anchor_info().oldest_block_slot;
sync_distance = current_oldest_block_slot
.saturating_sub(beacon_chain.genesis_backfill_slot);
speedo
// For backfill sync use a fake slot which is the distance we've progressed
// from the starting `original_oldest_block_slot`.
.observe(
original_oldest_block_slot.saturating_sub(current_oldest_block_slot),
Instant::now(),
);
}
SyncState::SyncingFinalized { .. }
| SyncState::SyncingHead { .. }
Expand Down Expand Up @@ -213,14 +205,14 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"Downloading historical blocks";
"distance" => distance,
"speed" => sync_speed_pretty(speed),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot).saturating_sub(beacon_chain.genesis_backfill_slot))),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_oldest_block_slot.saturating_sub(beacon_chain.genesis_backfill_slot))),
);
} else {
info!(
log,
"Downloading historical blocks";
"distance" => distance,
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot).saturating_sub(beacon_chain.genesis_backfill_slot))),
"est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_oldest_block_slot.saturating_sub(beacon_chain.genesis_backfill_slot))),
);
}
} else if !is_backfilling && last_backfill_log_slot.is_some() {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
backfill_state: RwLock::new(BackFillState::Paused),
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
custody_subnets,
custody_columns,
spec,
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/lighthouse_network/src/types/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ pub enum BackFillState {
Syncing,
/// A backfill sync has completed.
Completed,
/// A backfill sync is not required.
NotRequired,
/// Too many failed attempts at backfilling. Consider it failed.
Failed,
}
Expand Down
10 changes: 0 additions & 10 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,16 +656,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
peer_action: None,
}
}
HistoricalBlockError::NoAnchorInfo => {
warn!(self.log, "Backfill not required");

ChainSegmentFailed {
message: String::from("no_anchor_info"),
// There is no need to do a historical sync, this is not a fault of
// the peer.
peer_action: None,
}
}
HistoricalBlockError::IndexOutOfBounds => {
error!(
self.log,
Expand Down
94 changes: 34 additions & 60 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,18 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// If, for some reason a backfill has already been completed (or we've used a trusted
// genesis root) then backfill has been completed.

let (state, current_start) = match beacon_chain.store.get_anchor_info() {
Some(anchor_info) => {
if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) {
(BackFillState::Completed, Epoch::new(0))
} else {
(
BackFillState::Paused,
anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch()),
)
}
}
None => (BackFillState::NotRequired, Epoch::new(0)),
};
let anchor_info = beacon_chain.store.get_anchor_info();
let (state, current_start) =
if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) {
(BackFillState::Completed, Epoch::new(0))
} else {
(
BackFillState::Paused,
anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch()),
)
};

let bfs = BackFillSync {
batches: BTreeMap::new(),
Expand Down Expand Up @@ -253,35 +250,21 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.set_state(BackFillState::Syncing);

// Obtain a new start slot, from the beacon chain and handle possible errors.
match self.reset_start_epoch() {
Err(ResetEpochError::SyncCompleted) => {
error!(self.log, "Backfill sync completed whilst in failed status");
self.set_state(BackFillState::Completed);
return Err(BackFillError::InvalidSyncState(String::from(
"chain completed",
)));
}
Err(ResetEpochError::NotRequired) => {
error!(
self.log,
"Backfill sync not required whilst in failed status"
);
self.set_state(BackFillState::NotRequired);
return Err(BackFillError::InvalidSyncState(String::from(
"backfill not required",
)));
}
Ok(_) => {}
if let Err(e) = self.reset_start_epoch() {
let ResetEpochError::SyncCompleted = e;
error!(self.log, "Backfill sync completed whilst in failed status");
self.set_state(BackFillState::Completed);
return Err(BackFillError::InvalidSyncState(String::from(
"chain completed",
)));
}

debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start);

// begin requesting blocks from the peer pool, until all peers are exhausted.
self.request_batches(network)?;
}
BackFillState::Completed | BackFillState::NotRequired => {
return Ok(SyncStart::NotSyncing)
}
BackFillState::Completed => return Ok(SyncStart::NotSyncing),
}

Ok(SyncStart::Syncing {
Expand Down Expand Up @@ -313,10 +296,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
if matches!(self.state(), BackFillState::Failed) {
return Ok(());
}

Expand Down Expand Up @@ -1142,31 +1122,27 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// This errors if the beacon chain indicates that backfill sync has already completed or is
/// not required.
fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> {
if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
Err(ResetEpochError::SyncCompleted)
} else {
self.current_start = anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch());
Ok(())
}
let anchor_info = self.beacon_chain.store.get_anchor_info();
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
Err(ResetEpochError::SyncCompleted)
} else {
Err(ResetEpochError::NotRequired)
self.current_start = anchor_info
.oldest_block_slot
.epoch(T::EthSpec::slots_per_epoch());
Ok(())
}
}

/// Checks with the beacon chain if backfill sync has completed.
fn check_completed(&mut self) -> bool {
if self.would_complete(self.current_start) {
// Check that the beacon chain agrees
if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() {
// Conditions that we have completed a backfill sync
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
return true;
} else {
error!(self.log, "Backfill out of sync with beacon chain");
}
let anchor_info = self.beacon_chain.store.get_anchor_info();
// Conditions that we have completed a backfill sync
if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) {
return true;
} else {
error!(self.log, "Backfill out of sync with beacon chain");
}
}
false
Expand Down Expand Up @@ -1195,6 +1171,4 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
enum ResetEpochError {
/// The chain has already completed.
SyncCompleted,
/// Backfill is not required.
NotRequired,
}
Loading
Loading