From 57d357d4d337e3e1b20405912591ae76276e7a4a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 20 May 2022 04:29:25 +0000 Subject: [PATCH] Run fork choice before block proposal (#3168) ## Issue Addressed Upcoming spec change https://github.com/ethereum/consensus-specs/pull/2878 ## Proposed Changes 1. Run fork choice at the start of every slot, and wait for this run to complete before proposing a block. 2. As an optimisation, also run fork choice 3/4 of the way through the slot (at 9s), _dequeueing attestations for the next slot_. 3. Remove the fork choice run from the state advance timer that occurred before advancing the state. ## Additional Info ### Block Proposal Accuracy This change makes us more likely to propose on top of the correct head in the presence of re-orgs with proposer boost in play. The main scenario that this change is designed to address is described in the linked spec issue. ### Attestation Accuracy This change _also_ makes us more likely to attest to the correct head. Currently in the case of a skipped slot at `slot` we only run fork choice 9s into `slot - 1`. This means the attestations from `slot - 1` aren't taken into consideration, and any boost applied to the block from `slot - 1` is not removed (it should be). In the language of the linked spec issue, this means we are liable to attest to C, even when the majority voting weight has already caused a re-org to B. ### Why remove the call before the state advance? If we've run fork choice at the start of the slot then it has already dequeued all the attestations from the previous slot, which are the only ones eligible to influence the head in the current slot. Running fork choice again is unnecessary (unless we run it for the next slot and try to pre-empt a re-org, but I don't currently think this is a great idea). ### Performance Based on Prater testing this adds about 5-25ms of runtime to block proposal times, which are 500-1000ms on average (and spike to 5s+ sometimes due to state handling issues :cry: ). I believe this is a small enough penalty to enable it by default, with the option to disable it via the new flag `--fork-choice-before-proposal-timeout 0`. Upcoming work on block packing and state representation will also reduce block production times in general, while removing the spikes. ### Implementation Fork choice gets invoked at the start of the slot via the `per_slot_task` function called from the slot timer. It then uses a condition variable to signal to block production that fork choice has been updated. This is a bit funky, but it seems to work. One downside of the timer-based approach is that it doesn't happen automatically in most of the tests. The test added by this PR has to trigger the run manually. --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 110 ++++++++++++++++-- beacon_node/beacon_chain/src/builder.rs | 13 +++ beacon_node/beacon_chain/src/chain_config.rs | 7 ++ beacon_node/beacon_chain/src/errors.rs | 5 + .../beacon_chain/src/fork_choice_signal.rs | 97 +++++++++++++++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/src/metrics.rs | 4 + .../beacon_chain/src/state_advance_timer.rs | 87 ++++++++++++-- beacon_node/beacon_chain/src/test_utils.rs | 47 ++++---- beacon_node/http_api/Cargo.toml | 1 + .../http_api/tests/interactive_tests.rs | 97 ++++++++++++++- beacon_node/src/cli.rs | 9 ++ beacon_node/src/config.rs | 6 + lighthouse/tests/beacon_node.rs | 20 ++++ 15 files changed, 458 insertions(+), 47 deletions(-) create mode 100644 beacon_node/beacon_chain/src/fork_choice_signal.rs diff --git a/Cargo.lock b/Cargo.lock index b5d77d235d4..a35902c192c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2524,6 +2524,7 @@ dependencies = [ "lighthouse_metrics", "lighthouse_network", "lighthouse_version", + "logging", "network", "parking_lot 0.12.0", "safe_arith", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 60670c8490a..bbb9f8cb82c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -18,6 +18,7 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::ServerSentEventHandler; use crate::execution_payload::get_execution_payload; +use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::migrate::BackgroundMigrator; @@ -339,6 +340,10 @@ pub struct BeaconChain { /// A state-machine that is updated with information from the network and chooses a canonical /// head block. pub fork_choice: RwLock>, + /// Transmitter used to indicate that slot-start fork choice has completed running. + pub fork_choice_signal_tx: Option, + /// Receiver used by block production to wait on slot-start fork choice. + pub fork_choice_signal_rx: Option, /// A handler for events generated by the beacon chain. This is only initialized when the /// HTTP server is enabled. pub event_handler: Option>, @@ -2952,12 +2957,64 @@ impl BeaconChain { Ok(block_root) } + /// If configured, wait for the fork choice run at the start of the slot to complete. + fn wait_for_fork_choice_before_block_production( + self: &Arc, + slot: Slot, + ) -> Result<(), BlockProductionError> { + if let Some(rx) = &self.fork_choice_signal_rx { + let current_slot = self + .slot() + .map_err(|_| BlockProductionError::UnableToReadSlot)?; + + let timeout = Duration::from_millis(self.config.fork_choice_before_proposal_timeout_ms); + + if slot == current_slot || slot == current_slot + 1 { + match rx.wait_for_fork_choice(slot, timeout) { + ForkChoiceWaitResult::Success(fc_slot) => { + debug!( + self.log, + "Fork choice successfully updated before block production"; + "slot" => slot, + "fork_choice_slot" => fc_slot, + ); + } + ForkChoiceWaitResult::Behind(fc_slot) => { + warn!( + self.log, + "Fork choice notifier out of sync with block production"; + "fork_choice_slot" => fc_slot, + "slot" => slot, + "message" => "this block may be orphaned", + ); + } + ForkChoiceWaitResult::TimeOut => { + warn!( + self.log, + "Timed out waiting for fork choice before proposal"; + "message" => "this block may be orphaned", + ); + } + } + } else { + error!( + self.log, + "Producing block at incorrect slot"; + "block_slot" => slot, + "current_slot" => current_slot, + "message" => "check clock sync, this block may be orphaned", + ); + } + } + Ok(()) + } + /// Produce a new block at the given `slot`. /// /// The produced block will not be inherently valid, it must be signed by a block producer. /// Block signing is out of the scope of this function and should be done by a separate program. pub fn produce_block>( - &self, + self: &Arc, randao_reveal: Signature, slot: Slot, validator_graffiti: Option, @@ -2972,7 +3029,7 @@ impl BeaconChain { /// Same as `produce_block` but allowing for configuration of RANDAO-verification. pub fn produce_block_with_verification>( - &self, + self: &Arc, randao_reveal: Signature, slot: Slot, validator_graffiti: Option, @@ -2981,6 +3038,10 @@ impl BeaconChain { metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS); let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES); + let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_FORK_CHOICE_TIMES); + self.wait_for_fork_choice_before_block_production(slot)?; + drop(fork_choice_timer); + // Producing a block requires the tree hash cache, so clone a full state corresponding to // the head from the snapshot cache. Unfortunately we can't move the snapshot out of the // cache (which would be fast), because we need to re-process the block after it has been @@ -3362,10 +3423,18 @@ impl BeaconChain { /// Execute the fork choice algorithm and enthrone the result as the canonical head. pub fn fork_choice(self: &Arc) -> Result<(), Error> { + self.fork_choice_at_slot(self.slot()?) + } + + /// Execute fork choice at `slot`, processing queued attestations from `slot - 1` and earlier. + /// + /// The `slot` is not verified in any way, callers should ensure it corresponds to at most + /// one slot ahead of the current wall-clock slot. + pub fn fork_choice_at_slot(self: &Arc, slot: Slot) -> Result<(), Error> { metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS); let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES); - let result = self.fork_choice_internal(); + let result = self.fork_choice_internal(slot); if result.is_err() { metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS); @@ -3374,13 +3443,13 @@ impl BeaconChain { result } - fn fork_choice_internal(self: &Arc) -> Result<(), Error> { + fn fork_choice_internal(self: &Arc, slot: Slot) -> Result<(), Error> { // Atomically obtain the head block root and the finalized block. let (beacon_block_root, finalized_block) = { let mut fork_choice = self.fork_choice.write(); // Determine the root of the block that is the head of the chain. - let beacon_block_root = fork_choice.get_head(self.slot()?, &self.spec)?; + let beacon_block_root = fork_choice.get_head(slot, &self.spec)?; (beacon_block_root, fork_choice.get_finalized_block()?) }; @@ -3752,6 +3821,8 @@ impl BeaconChain { } // Update the execution layer. + // Always use the wall-clock slot to update the execution engine rather than the `slot` + // passed in. if let Err(e) = self.update_execution_engine_forkchoice_blocking(self.slot()?) { crit!( self.log, @@ -4005,8 +4076,6 @@ impl BeaconChain { "prepare_slot" => prepare_slot ); - // Use the blocking method here so that we don't form a queue of these functions when - // routinely calling them. self.update_execution_engine_forkchoice_async(current_slot) .await?; } @@ -4336,11 +4405,32 @@ impl BeaconChain { } /// Called by the timer on every slot. - /// - /// Performs slot-based pruning. - pub fn per_slot_task(&self) { + pub fn per_slot_task(self: &Arc) { trace!(self.log, "Running beacon chain per slot tasks"); if let Some(slot) = self.slot_clock.now() { + // Run fork choice and signal to any waiting task that it has completed. + if let Err(e) = self.fork_choice() { + error!( + self.log, + "Fork choice error at slot start"; + "error" => ?e, + "slot" => slot, + ); + } + + // Send the notification regardless of fork choice success, this is a "best effort" + // notification and we don't want block production to hit the timeout in case of error. + if let Some(tx) = &self.fork_choice_signal_tx { + if let Err(e) = tx.notify_fork_choice_complete(slot) { + warn!( + self.log, + "Error signalling fork choice waiter"; + "error" => ?e, + "slot" => slot, + ); + } + } + self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2efc972ed56..361246b4d38 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,5 +1,6 @@ use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; +use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; @@ -694,6 +695,16 @@ where ); } + // If enabled, set up the fork choice signaller. + let (fork_choice_signal_tx, fork_choice_signal_rx) = + if self.chain_config.fork_choice_before_proposal_timeout_ms != 0 { + let tx = ForkChoiceSignalTx::new(); + let rx = tx.get_receiver(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + // Store the `PersistedBeaconChain` in the database atomically with the metadata so that on // restart we can correctly detect the presence of an initialized database. // @@ -752,6 +763,8 @@ where genesis_block_root, genesis_state_root, fork_choice: RwLock::new(fork_choice), + fork_choice_signal_tx, + fork_choice_signal_rx, event_handler: self.event_handler, head_tracker, snapshot_cache: TimeoutRwLock::new(SnapshotCache::new( diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 4aee06d468c..36c2f41d9d6 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,6 +1,8 @@ use serde_derive::{Deserialize, Serialize}; use types::Checkpoint; +pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250; + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing a consensus message (e.g., block, @@ -18,6 +20,10 @@ pub struct ChainConfig { pub enable_lock_timeouts: bool, /// The max size of a message that can be sent over the network. pub max_network_size: usize, + /// Number of milliseconds to wait for fork choice before proposing a block. + /// + /// If set to 0 then block proposal will not wait for fork choice at all. + pub fork_choice_before_proposal_timeout_ms: u64, } impl Default for ChainConfig { @@ -28,6 +34,7 @@ impl Default for ChainConfig { reconstruct_historic_states: false, enable_lock_timeouts: true, max_network_size: 10 * 1_048_576, // 10M + fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT, } } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 280ec3fac37..834823992ac 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -185,6 +185,10 @@ pub enum BeaconChainError { }, RuntimeShutdown, ProcessInvalidExecutionPayload(JoinError), + ForkChoiceSignalOutOfOrder { + current: Slot, + latest: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); @@ -234,6 +238,7 @@ pub enum BlockProductionError { FailedToReadFinalizedBlock(store::Error), MissingFinalizedBlock(Hash256), BlockTooLarge(usize), + ForkChoiceError(BeaconChainError), } easy_from_to!(BlockProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/fork_choice_signal.rs b/beacon_node/beacon_chain/src/fork_choice_signal.rs new file mode 100644 index 00000000000..fd92de661da --- /dev/null +++ b/beacon_node/beacon_chain/src/fork_choice_signal.rs @@ -0,0 +1,97 @@ +//! Concurrency helpers for synchronising block proposal with fork choice. +//! +//! The transmitter provides a way for a thread runnning fork choice on a schedule to signal +//! to the receiver that fork choice has been updated for a given slot. +use crate::BeaconChainError; +use parking_lot::{Condvar, Mutex}; +use std::sync::Arc; +use std::time::Duration; +use types::Slot; + +/// Sender, for use by the per-slot task timer. +pub struct ForkChoiceSignalTx { + pair: Arc<(Mutex, Condvar)>, +} + +/// Receiver, for use by the beacon chain waiting on fork choice to complete. +pub struct ForkChoiceSignalRx { + pair: Arc<(Mutex, Condvar)>, +} + +pub enum ForkChoiceWaitResult { + /// Successfully reached a slot greater than or equal to the awaited slot. + Success(Slot), + /// Fork choice was updated to a lower slot, indicative of lag or processing delays. + Behind(Slot), + /// Timed out waiting for the fork choice update from the sender. + TimeOut, +} + +impl ForkChoiceSignalTx { + pub fn new() -> Self { + let pair = Arc::new((Mutex::new(Slot::new(0)), Condvar::new())); + Self { pair } + } + + pub fn get_receiver(&self) -> ForkChoiceSignalRx { + ForkChoiceSignalRx { + pair: self.pair.clone(), + } + } + + /// Signal to the receiver that fork choice has been updated to `slot`. + /// + /// Return an error if the provided `slot` is strictly less than any previously provided slot. + pub fn notify_fork_choice_complete(&self, slot: Slot) -> Result<(), BeaconChainError> { + let &(ref lock, ref condvar) = &*self.pair; + + let mut current_slot = lock.lock(); + + if slot < *current_slot { + return Err(BeaconChainError::ForkChoiceSignalOutOfOrder { + current: *current_slot, + latest: slot, + }); + } else { + *current_slot = slot; + } + + // We use `notify_all` because there may be multiple block proposals waiting simultaneously. + // Usually there'll be 0-1. + condvar.notify_all(); + + Ok(()) + } +} + +impl Default for ForkChoiceSignalTx { + fn default() -> Self { + Self::new() + } +} + +impl ForkChoiceSignalRx { + pub fn wait_for_fork_choice(&self, slot: Slot, timeout: Duration) -> ForkChoiceWaitResult { + let &(ref lock, ref condvar) = &*self.pair; + + let mut current_slot = lock.lock(); + + // Wait for `current_slot >= slot`. + // + // Do not loop and wait, if we receive an update for the wrong slot then something is + // quite out of whack and we shouldn't waste more time waiting. + if *current_slot < slot { + let timeout_result = condvar.wait_for(&mut current_slot, timeout); + + if timeout_result.timed_out() { + return ForkChoiceWaitResult::TimeOut; + } + } + + if *current_slot >= slot { + ForkChoiceWaitResult::Success(*current_slot) + } else { + ForkChoiceWaitResult::Behind(*current_slot) + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 65908547fff..579020b1d1e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -15,6 +15,7 @@ mod errors; pub mod eth1_chain; pub mod events; mod execution_payload; +pub mod fork_choice_signal; pub mod fork_revert; mod head_tracker; pub mod historical_blocks; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 41b76045329..4d0f63674ae 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -90,6 +90,10 @@ lazy_static! { ); pub static ref BLOCK_PRODUCTION_TIMES: Result = try_create_histogram("beacon_block_production_seconds", "Full runtime of block production"); + pub static ref BLOCK_PRODUCTION_FORK_CHOICE_TIMES: Result = try_create_histogram( + "beacon_block_production_fork_choice_seconds", + "Time taken to run fork choice before block production" + ); pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result = try_create_histogram( "beacon_block_production_state_load_seconds", "Time taken to load the base state for block production" diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 71934d844d8..7216ac11189 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -27,7 +27,7 @@ use std::sync::{ Arc, }; use task_executor::TaskExecutor; -use tokio::time::sleep; +use tokio::time::{sleep, sleep_until, Instant}; use types::{AttestationShufflingId, EthSpec, Hash256, RelativeEpoch, Slot}; /// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform @@ -105,8 +105,8 @@ async fn state_advance_timer( let slot_duration = slot_clock.slot_duration(); loop { - match beacon_chain.slot_clock.duration_to_next_slot() { - Some(duration) => sleep(duration + (slot_duration / 4) * 3).await, + let duration_to_next_slot = match beacon_chain.slot_clock.duration_to_next_slot() { + Some(duration) => duration, None => { error!(log, "Failed to read slot clock"); // If we can't read the slot clock, just wait another slot. @@ -115,7 +115,45 @@ async fn state_advance_timer( } }; - // Only start spawn the state advance task if the lock was previously free. + // Run the state advance 3/4 of the way through the slot (9s on mainnet). + let state_advance_offset = slot_duration / 4; + let state_advance_instant = if duration_to_next_slot > state_advance_offset { + Instant::now() + duration_to_next_slot - state_advance_offset + } else { + // Skip the state advance for the current slot and wait until the next one. + Instant::now() + duration_to_next_slot + slot_duration - state_advance_offset + }; + + // Run fork choice 23/24s of the way through the slot (11.5s on mainnet). + // We need to run after the state advance, so use the same condition as above. + let fork_choice_offset = slot_duration / 24; + let fork_choice_instant = if duration_to_next_slot > state_advance_offset { + Instant::now() + duration_to_next_slot - fork_choice_offset + } else { + Instant::now() + duration_to_next_slot + slot_duration - fork_choice_offset + }; + + // Wait for the state advance. + sleep_until(state_advance_instant).await; + + // Compute the current slot here at approx 3/4 through the slot. Even though this slot is + // only used by fork choice we need to calculate it here rather than after the state + // advance, in case the state advance flows over into the next slot. + let current_slot = match beacon_chain.slot() { + Ok(slot) => slot, + Err(e) => { + warn!( + log, + "Unable to determine slot in state advance timer"; + "error" => ?e + ); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + }; + + // Only spawn the state advance task if the lock was previously free. if !is_running.lock() { let log = log.clone(); let beacon_chain = beacon_chain.clone(); @@ -163,6 +201,40 @@ async fn state_advance_timer( "msg" => "system resources may be overloaded" ) } + + // Run fork choice pre-emptively for the next slot. This processes most of the attestations + // from this slot off the hot path of block verification and production. + // Wait for the fork choice instant (which may already be past). + sleep_until(fork_choice_instant).await; + + let log = log.clone(); + let beacon_chain = beacon_chain.clone(); + let next_slot = current_slot + 1; + executor.spawn_blocking( + move || { + if let Err(e) = beacon_chain.fork_choice_at_slot(next_slot) { + warn!( + log, + "Error updating fork choice for next slot"; + "error" => ?e, + "slot" => next_slot, + ); + } + + // Signal block proposal for the next slot (if it happens to be waiting). + if let Some(tx) = &beacon_chain.fork_choice_signal_tx { + if let Err(e) = tx.notify_fork_choice_complete(next_slot) { + warn!( + log, + "Error signalling fork choice waiter"; + "error" => ?e, + "slot" => next_slot, + ); + } + } + }, + "fork_choice_advance", + ); } } @@ -193,13 +265,6 @@ fn advance_head( } } - // Run fork choice so we get the latest view of the head. - // - // This is useful since it's quite likely that the last time we ran fork choice was shortly - // after receiving the latest gossip block, but not necessarily after we've received the - // majority of attestations. - beacon_chain.fork_choice()?; - let head_root = beacon_chain.head_info()?.block_root; let (head_slot, head_state_root, mut state) = match beacon_chain diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2dc1d0301db..69ed413fd40 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -61,7 +61,7 @@ pub type BaseHarnessType = pub type DiskHarnessType = BaseHarnessType, LevelDB>; pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; -type BoxedMutator = Box< +pub type BoxedMutator = Box< dyn FnOnce( BeaconChainBuilder>, ) -> BeaconChainBuilder>, @@ -586,18 +586,7 @@ where // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); - let randao_reveal = { - let epoch = slot.epoch(E::slots_per_epoch()); - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &state.fork(), - state.genesis_validators_root(), - ); - let message = epoch.signing_root(domain); - let sk = &self.validator_keypairs[proposer_index].sk; - sk.sign(message) - }; + let randao_reveal = self.sign_randao_reveal(&state, proposer_index, slot); let (block, state) = self .chain @@ -645,18 +634,7 @@ where // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); - let randao_reveal = { - let epoch = slot.epoch(E::slots_per_epoch()); - let domain = self.spec.get_domain( - epoch, - Domain::Randao, - &state.fork(), - state.genesis_validators_root(), - ); - let message = epoch.signing_root(domain); - let sk = &self.validator_keypairs[proposer_index].sk; - sk.sign(message) - }; + let randao_reveal = self.sign_randao_reveal(&state, proposer_index, slot); let pre_state = state.clone(); @@ -682,6 +660,25 @@ where (signed_block, pre_state) } + /// Create a randao reveal for a block at `slot`. + pub fn sign_randao_reveal( + &self, + state: &BeaconState, + proposer_index: usize, + slot: Slot, + ) -> Signature { + let epoch = slot.epoch(E::slots_per_epoch()); + let domain = self.spec.get_domain( + epoch, + Domain::Randao, + &state.fork(), + state.genesis_validators_root(), + ); + let message = epoch.signing_root(domain); + let sk = &self.validator_keypairs[proposer_index].sk; + sk.sign(message) + } + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the /// `block` identified by `beacon_block_root`. diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index f982f0d0229..a34618c2ef4 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -38,6 +38,7 @@ store = { path = "../store" } environment = { path = "../../lighthouse/environment" } tree_hash = "0.4.1" sensitive_url = { path = "../../common/sensitive_url" } +logging = { path = "../../common/logging" } [[test]] name = "bn_http_api_tests" diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 64ce3b6566c..8b12aa4a5b2 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -1,7 +1,9 @@ //! Generic tests that make use of the (newer) `InteractiveApiTester` use crate::common::*; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use eth2::types::DepositContractData; -use types::{EthSpec, MainnetEthSpec}; +use tree_hash::TreeHash; +use types::{EthSpec, FullPayload, MainnetEthSpec, Slot}; type E = MainnetEthSpec; @@ -30,3 +32,96 @@ async fn deposit_contract_custom_network() { assert_eq!(result, expected); } + +// Test that running fork choice before proposing results in selection of the correct head. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn fork_choice_before_proposal() { + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 32; + let all_validators = (0..validator_count).collect::>(); + let num_initial: u64 = 31; + + let tester = InteractiveTester::::new(None, validator_count).await; + let harness = &tester.harness; + + // Create some chain depth. + harness.advance_slot(); + harness.extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // We set up the following block graph, where B is a block that is temporarily orphaned by C, + // but is then reinstated and built upon by D. + // + // A | B | - | D | + // ^ | - | C | + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + let slot_c = slot_a + 2; + let slot_d = slot_a + 3; + + let state_a = harness.get_current_state(); + let (block_b, state_b) = harness.make_block(state_a.clone(), slot_b); + let block_root_b = harness.process_block(slot_b, block_b).unwrap(); + + // Create attestations to B but keep them in reserve until after C has been processed. + let attestations_b = harness.make_attestations( + &all_validators, + &state_b, + state_b.tree_hash_root(), + block_root_b, + slot_b, + ); + + let (block_c, state_c) = harness.make_block(state_a, slot_c); + let block_root_c = harness.process_block(slot_c, block_c.clone()).unwrap(); + + // Create attestations to C from a small number of validators and process them immediately. + let attestations_c = harness.make_attestations( + &all_validators[..validator_count / 2], + &state_c, + state_c.tree_hash_root(), + block_root_c, + slot_c, + ); + harness.process_attestations(attestations_c); + + // Apply the attestations to B, but don't re-run fork choice. + harness.process_attestations(attestations_b); + + // Due to proposer boost, the head should be C during slot C. + assert_eq!( + harness.chain.head_info().unwrap().block_root, + block_root_c.into() + ); + + // Ensure that building a block via the HTTP API re-runs fork choice and builds block D upon B. + // Manually prod the per-slot task, because the slot timer doesn't run in the background in + // these tests. + harness.advance_slot(); + harness.chain.per_slot_task(); + + let proposer_index = state_b + .get_beacon_proposer_index(slot_d, &harness.chain.spec) + .unwrap(); + let randao_reveal = harness + .sign_randao_reveal(&state_b, proposer_index, slot_d) + .into(); + let block_d = tester + .client + .get_validator_blocks::>(slot_d, &randao_reveal, None) + .await + .unwrap() + .data; + + // Head is now B. + assert_eq!( + harness.chain.head_info().unwrap().block_root, + block_root_b.into() + ); + // D's parent is B. + assert_eq!(block_d.parent_root(), block_root_b.into()); +} diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6e4c2996a62..a1347c9b023 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -686,4 +686,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { experimental as it may obscure performance issues.") .takes_value(false) ) + .arg( + Arg::with_name("fork-choice-before-proposal-timeout") + .long("fork-choice-before-proposal-timeout") + .help("Set the maximum number of milliseconds to wait for fork choice before \ + proposing a block. You can prevent waiting at all by setting the timeout \ + to 0, however you risk proposing atop the wrong parent block.") + .default_value("250") + .takes_value(true) + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 461f230d280..b1560c79556 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -581,6 +581,12 @@ pub fn get_config( client_config.chain.enable_lock_timeouts = false; } + if let Some(timeout) = + clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? + { + client_config.chain.fork_choice_before_proposal_timeout_ms = timeout; + } + Ok(client_config) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 3088fa423df..5748bbd3416 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -108,6 +108,26 @@ fn disable_lock_timeouts_flag() { .with_config(|config| assert!(!config.chain.enable_lock_timeouts)); } +#[test] +fn fork_choice_before_proposal_timeout_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.fork_choice_before_proposal_timeout_ms, + beacon_node::beacon_chain::chain_config::DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT + ) + }); +} + +#[test] +fn fork_choice_before_proposal_timeout_zero() { + CommandLineTest::new() + .flag("fork-choice-before-proposal-timeout", Some("0")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.fork_choice_before_proposal_timeout_ms, 0)); +} + #[test] fn freezer_dir_flag() { let dir = TempDir::new().expect("Unable to create temporary directory");