Skip to content

Commit

Permalink
Implement proposer boost re-orging
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Dec 17, 2021
1 parent 10dac51 commit 647622c
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 7 deletions.
87 changes: 85 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::snapshot_cache::{BlockProductionPreState, SnapshotCache};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand Down Expand Up @@ -101,6 +101,9 @@ pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// validator pubkey cache.
pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);

/// The latest delay from the start of the slot at which to attempt a 1-slot re-org.
const MAX_RE_ORG_SLOT_DELAY: Duration = Duration::from_secs(2);

// These keys are all zero because they get stored in different columns, see `DBColumn` type.
pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::zero();
pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
Expand Down Expand Up @@ -2723,8 +2726,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.head_info()
.map_err(BlockProductionError::UnableToGetHeadInfo)?;
let (state, state_root_opt) = if head_info.slot < slot {
// Attempt an aggressive re-org if configured and the conditions are right.
if let Some(re_org_state) = self.get_state_for_re_org(slot, &head_info)? {
info!(
self.log,
"Proposing block to re-org current head";
"slot" => slot,
"head" => %head_info.block_root,
);
(re_org_state.pre_state, re_org_state.state_root)
}
// Normal case: proposing a block atop the current head. Use the snapshot cache.
if let Some(pre_state) = self
else if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
Expand Down Expand Up @@ -2769,6 +2782,76 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

fn get_state_for_re_org(
&self,
slot: Slot,
head_info: &HeadInfo,
) -> Result<Option<BlockProductionPreState<T::EthSpec>>, BlockProductionError> {
if let Some(re_org_threshold) = self.config.re_org_threshold {
let canonical_head = head_info.block_root;
let slot_delay = self
.slot_clock
.seconds_from_current_slot_start(self.spec.seconds_per_slot)
.ok_or(BlockProductionError::UnableToReadSlot)?;

// Check that we're producing a block one slot after the current head, and early enough
// in the slot to be able to propagate widely.
if head_info.slot + 1 == slot && slot_delay < MAX_RE_ORG_SLOT_DELAY {
// Is the current head weak and appropriate for re-orging?
let proposer_head = self.fork_choice.write().get_proposer_head(
slot,
canonical_head,
re_org_threshold,
)?;
if let Some(re_org_head) = proposer_head.re_org_head {
// Only attempt a re-org if we hit the snapshot cache.
if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_head)
})
{
debug!(
self.log,
"Attempting re-org due to weak head";
"head" => ?canonical_head,
"re_org_head" => ?re_org_head,
"head_weight" => ?proposer_head.canonical_head_weight,
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
);
return Ok(Some(pre_state));
} else {
debug!(
self.log,
"Not attempting re-org due to cache miss";
"head" => ?canonical_head,
"re_org_head" => ?re_org_head,
"head_weight" => ?proposer_head.canonical_head_weight,
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
);
}
} else {
debug!(
self.log,
"Not attempting re-org due to strong head";
"head" => ?canonical_head,
"head_weight" => ?proposer_head.canonical_head_weight,
"re_org_weight" => ?proposer_head.re_org_weight_threshold,
);
}
} else {
debug!(
self.log,
"Not attempting re-org due to slot distance";
"head" => ?canonical_head,
);
}
}

Ok(None)
}

/// Produce a block for some `slot` upon the given `state`.
///
/// Typically the `self.produce_block()` function should be used, instead of calling this
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde_derive::{Deserialize, Serialize};
use types::Checkpoint;

pub const DEFAULT_RE_ORG_THRESHOLD: u64 = 10;

#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
pub struct ChainConfig {
/// Maximum number of slots to skip when importing a consensus message (e.g., block,
Expand All @@ -18,6 +20,8 @@ pub struct ChainConfig {
pub enable_lock_timeouts: bool,
/// The max size of a message that can be sent over the network.
pub max_network_size: usize,
/// Maximum percentage of weight at which to attempt re-orging the canonical head.
pub re_org_threshold: Option<u64>,
}

impl Default for ChainConfig {
Expand All @@ -28,6 +32,7 @@ impl Default for ChainConfig {
reconstruct_historic_states: false,
enable_lock_timeouts: true,
max_network_size: 10 * 1_048_576, // 10M
re_org_threshold: None,
}
}
}
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ pub enum BlockProductionError {
UnableToProduceAtSlot(Slot),
SlotProcessingError(SlotProcessingError),
BlockProcessingError(BlockProcessingError),
ForkChoiceError(ForkChoiceError),
Eth1ChainError(Eth1ChainError),
BeaconStateError(BeaconStateError),
StateAdvanceError(StateAdvanceError),
Expand All @@ -194,3 +195,4 @@ easy_from_to!(BeaconStateError, BlockProductionError);
easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
easy_from_to!(StateAdvanceError, BlockProductionError);
easy_from_to!(ForkChoiceError, BlockProductionError);
14 changes: 14 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,4 +625,18 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
experimental as it may obscure performance issues.")
.takes_value(false)
)
.arg(
Arg::with_name("enable-proposer-re-orgs")
.long("enable-proposer-re-orgs")
.help("Attempt to re-org out weak/late blocks from other proposers \
(dangerous, experimental)")
.takes_value(true)
)
.arg(
Arg::with_name("proposer-re-org-fraction")
.long("proposer-re-org-fraction")
.help("Percentage of vote weight below which to attempt a proposer re-org")
.requires("enable-proposer-re-orgs")
.takes_value(true)
)
}
12 changes: 12 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use beacon_chain::chain_config::DEFAULT_RE_ORG_THRESHOLD;
use clap::ArgMatches;
use clap_utils::flags::DISABLE_MALLOC_TUNING_FLAG;
use client::{ClientConfig, ClientGenesis};
Expand Down Expand Up @@ -562,6 +563,17 @@ pub fn get_config<E: EthSpec>(
client_config.chain.enable_lock_timeouts = false;
}

if let Some(enable_re_orgs) = clap_utils::parse_optional(cli_args, "enable-proposer-re-orgs")? {
if enable_re_orgs {
client_config.chain.re_org_threshold = Some(
clap_utils::parse_optional(cli_args, "proposer-re-org-fraction")?
.unwrap_or(DEFAULT_RE_ORG_THRESHOLD),
);
} else {
client_config.chain.re_org_threshold = None;
}
}

Ok(client_config)
}

Expand Down
21 changes: 20 additions & 1 deletion consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::ForkChoiceStore;
use proto_array::{Block as ProtoBlock, ExecutionStatus, ProtoArrayForkChoice};
use proto_array::{Block as ProtoBlock, ExecutionStatus, ProposerHead, ProtoArrayForkChoice};
use ssz_derive::{Decode, Encode};
use std::cmp::Ordering;
use std::marker::PhantomData;
Expand Down Expand Up @@ -412,6 +412,25 @@ where
.map_err(Into::into)
}

pub fn get_proposer_head(
&mut self,
current_slot: Slot,
canonical_head: Hash256,
re_org_threshold: u64,
) -> Result<ProposerHead, Error<T::Error>> {
// Calling `update_time` is essential, as it needs to dequeue attestations from the previous
// slot so we can see how many attesters voted for the canonical head.
self.update_time(current_slot)?;

self.proto_array
.get_proposer_head::<E>(
self.fc_store.justified_balances(),
canonical_head,
re_org_threshold,
)
.map_err(Into::into)
}

/// Returns `true` if the given `store` should be updated to set
/// `state.current_justified_checkpoint` its `justified_checkpoint`.
///
Expand Down
1 change: 1 addition & 0 deletions consensus/proto_array/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Error {
InvalidNodeDelta(usize),
DeltaOverflow(usize),
ProposerBoostOverflow(usize),
UniqueWeightOverflow(Hash256),
IndexOverflow(&'static str),
InvalidDeltaLen {
deltas: usize,
Expand Down
4 changes: 3 additions & 1 deletion consensus/proto_array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ mod proto_array;
mod proto_array_fork_choice;
mod ssz_container;

pub use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice};
pub use crate::proto_array_fork_choice::{
Block, ExecutionStatus, ProposerHead, ProtoArrayForkChoice,
};
pub use error::Error;

pub mod core {
Expand Down
2 changes: 1 addition & 1 deletion consensus/proto_array/src/proto_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl ProtoArray {
/// Returns `None` if there is an overflow or underflow when calculating the score.
///
/// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance
fn calculate_proposer_boost<E: EthSpec>(
pub fn calculate_proposer_boost<E: EthSpec>(
validator_balances: &[u64],
proposer_score_boost: u64,
) -> Option<u64> {
Expand Down
84 changes: 83 additions & 1 deletion consensus/proto_array/src/proto_array_fork_choice.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Error;
use crate::proto_array::{ProposerBoost, ProtoArray};
use crate::proto_array::{calculate_proposer_boost, ProposerBoost, ProtoArray};
use crate::ssz_container::SszContainer;
use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
Expand Down Expand Up @@ -92,11 +92,26 @@ where
&mut self.0[i]
}

pub fn iter(&self) -> impl Iterator<Item = &T> {
self.0.iter()
}

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
self.0.iter_mut()
}
}

/// Information about the proposer head used for opportunistic re-orgs.
#[derive(Default, Clone)]
pub struct ProposerHead {
/// If set, the head block that the proposer should build upon.
pub re_org_head: Option<Hash256>,
/// The weight difference between the canonical head and its parent.
pub canonical_head_weight: Option<u64>,
/// The computed fraction of the active committee balance below which we can re-org.
pub re_org_weight_threshold: Option<u64>,
}

#[derive(PartialEq)]
pub struct ProtoArrayForkChoice {
pub(crate) proto_array: ProtoArray,
Expand Down Expand Up @@ -214,6 +229,73 @@ impl ProtoArrayForkChoice {
.map_err(|e| format!("find_head failed: {:?}", e))
}

pub fn get_proposer_head<E: EthSpec>(
&self,
justified_state_balances: &[u64],
canonical_head: Hash256,
re_org_vote_fraction: u64,
) -> Result<ProposerHead, String> {
let nodes = self
.proto_array
.iter_nodes(&canonical_head)
.take(2)
.collect::<Vec<_>>();
if nodes.len() != 2 {
return Ok(ProposerHead::default());
}
let head_node = nodes[0];
let parent_node = nodes[1];

// Re-org conditions.
let is_single_slot_re_org = parent_node.slot + 1 == head_node.slot;
let re_org_weight_threshold =
calculate_proposer_boost::<E>(justified_state_balances, re_org_vote_fraction)
.ok_or_else(|| {
"overflow calculating committee weight for proposer boost".to_string()
})?;
let canonical_head_weight = self
.get_block_unique_weight(canonical_head, justified_state_balances)
.map_err(|e| format!("overflow calculating head weight: {:?}", e))?;
let is_weak_head = canonical_head_weight < re_org_weight_threshold;

let re_org_head = (is_single_slot_re_org && is_weak_head).then(|| parent_node.root);

Ok(ProposerHead {
re_org_head,
canonical_head_weight: Some(canonical_head_weight),
re_org_weight_threshold: Some(re_org_weight_threshold),
})
}

/// Compute the sum of attester balances of attestations to a specific block root.
///
/// This weight is the weight unique to the block, *not* including the weight of its ancestors.
///
/// Any `proposer_boost` in effect is ignored: only attestations are counted.
fn get_block_unique_weight(
&self,
block_root: Hash256,
justified_balances: &[u64],
) -> Result<u64, Error> {
let mut unique_weight = 0u64;
for (validator_index, vote) in self.votes.iter().enumerate() {
// Check the `next_root` as we care about the most recent attestations, including ones
// from the previous slot that have just been dequeued but haven't run fully through
// fork choice yet.
if vote.next_root == block_root {
let validator_balance = justified_balances
.get(validator_index)
.copied()
.unwrap_or(0);

unique_weight = unique_weight
.checked_add(validator_balance)
.ok_or(Error::UniqueWeightOverflow(block_root))?;
}
}
Ok(unique_weight)
}

pub fn maybe_prune(&mut self, finalized_root: Hash256) -> Result<(), String> {
self.proto_array
.maybe_prune(finalized_root)
Expand Down
4 changes: 3 additions & 1 deletion scripts/local_testnet/beacon_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ exec lighthouse \
--port $network_port \
--http-port $http_port \
--disable-packet-filter \
--target-peers $((BN_COUNT - 1))
--target-peers $((BN_COUNT - 1)) \
--enable-proposer-re-orgs true \
--proposer-re-org-fraction 60

0 comments on commit 647622c

Please sign in to comment.