From 61d5b592cb1020b23c3ecc45df12d79963ff5100 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Aug 2020 08:05:13 +0000 Subject: [PATCH 1/6] Memory usage reduction (#1522) ## Issue Addressed NA ## Proposed Changes - Adds a new function to allow getting a state with a bad state root history for attestation verification. This reduces unnecessary tree hashing during attestation processing, which accounted for 23% of memory allocations (by bytes) in a recent `heaptrack` observation. - Don't clone caches on intermediate epoch-boundary states during block processing. - Reject blocks that are known to fork choice earlier during gossip processing, instead of waiting until after state has been loaded (this only happens in edge-case). - Avoid multiple re-allocations by creating a "forced" exact size iterator. ## Additional Info NA --- .../src/attestation_verification.rs | 7 +- .../beacon_chain/src/block_verification.rs | 19 ++++- beacon_node/store/src/hot_cold_store.rs | 76 ++++++++++++++++--- .../types/src/beacon_state/tree_hash_cache.rs | 40 ++++++++-- 4 files changed, 122 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index aa24d08b2d6..9c8ec92e596 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -775,7 +775,12 @@ where metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); let mut state = chain - .get_state(&target_block.state_root, Some(target_block.slot))? + .store + .get_inconsistent_state_for_attestation_verification_only( + &target_block.state_root, + Some(target_block.slot), + ) + .map_err(BeaconChainError::from)? .ok_or_else(|| BeaconChainError::MissingBeaconState(target_block.state_root))?; metrics::stop_timer(state_read_timer); diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 38acf1b1799..92460dba125 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -390,9 +390,22 @@ impl GossipVerifiedBlock { }); } + let block_root = get_block_root(&block); + // Do not gossip a block from a finalized slot. check_block_against_finalized_slot(&block.message, chain)?; + // Check if the block is already known. We know it is post-finalization, so it is + // sufficient to check the fork choice. + // + // In normal operation this isn't necessary, however it is useful immediately after a + // reboot if the `observed_block_producers` cache is empty. In that case, without this + // check, we will load the parent and state from disk only to find out later that we + // already know this block. + if chain.fork_choice.read().contains_block(&block_root) { + return Err(BlockError::BlockIsAlreadyKnown); + } + // Check that we have not already received a block with a valid signature for this slot. if chain .observed_block_producers @@ -415,7 +428,6 @@ impl GossipVerifiedBlock { )?; let (mut parent, block) = load_parent(block, chain)?; - let block_root = get_block_root(&block); let state = cheap_state_advance_to_obtain_committees( &mut parent.beacon_state, @@ -672,7 +684,10 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { let state_root = state.update_tree_hash_cache()?; let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { - StoreOp::PutState(state_root.into(), Cow::Owned(state.clone())) + StoreOp::PutState( + state_root.into(), + Cow::Owned(state.clone_with(CloneConfig::committee_caches_only())), + ) } else { StoreOp::PutStateSummary( state_root.into(), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7bad9ef1567..692e747d79f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -30,6 +30,16 @@ use types::*; /// 32-byte key for accessing the `split` of the freezer DB. pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; +/// Defines how blocks should be replayed on states. +#[derive(PartialEq)] +pub enum BlockReplay { + /// Perform all transitions faithfully to the specification. + Accurate, + /// Don't compute state roots, eventually computing an invalid beacon state that can only be + /// used for obtaining shuffling. + InconsistentStateRoots, +} + /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -230,16 +240,40 @@ impl, Cold: ItemStore> HotColdDB // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { - self.load_hot_state(state_root) + self.load_hot_state(state_root, BlockReplay::Accurate) } } else { - match self.load_hot_state(state_root)? { + match self.load_hot_state(state_root, BlockReplay::Accurate)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } } } + /// Fetch a state from the store, but don't compute all of the values when replaying blocks + /// upon that state (e.g., state roots). Additionally, only states from the hot store are + /// returned. + /// + /// See `Self::get_state` for information about `slot`. + /// + /// ## Warning + /// + /// The returned state **is not a valid beacon state**, it can only be used for obtaining + /// shuffling to process attestations. + pub fn get_inconsistent_state_for_attestation_verification_only( + &self, + state_root: &Hash256, + slot: Option, + ) -> Result>, Error> { + metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); + + if slot.map_or(false, |slot| slot < self.get_split_slot()) { + Ok(None) + } else { + self.load_hot_state(state_root, BlockReplay::InconsistentStateRoots) + } + } + /// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk. /// /// It is assumed that all states being deleted reside in the hot DB, even if their slot is less @@ -283,8 +317,11 @@ impl, Cold: ItemStore> HotColdDB }) = self.load_hot_state_summary(state_root)? { // NOTE: minor inefficiency here because we load an unnecessary hot state summary + // + // `BlockReplay` should be irrelevant here since we never replay blocks for an epoch + // boundary state in the hot DB. let state = self - .load_hot_state(&epoch_boundary_state_root)? + .load_hot_state(&epoch_boundary_state_root, BlockReplay::Accurate)? .ok_or_else(|| { HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root) })?; @@ -415,7 +452,11 @@ impl, Cold: ItemStore> HotColdDB /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. - pub fn load_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + pub fn load_hot_state( + &self, + state_root: &Hash256, + block_replay: BlockReplay, + ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); if let Some(HotStateSummary { @@ -436,7 +477,7 @@ impl, Cold: ItemStore> HotColdDB } else { let blocks = self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?; - self.replay_blocks(boundary_state, blocks, slot)? + self.replay_blocks(boundary_state, blocks, slot, block_replay)? }; Ok(Some(state)) @@ -567,7 +608,7 @@ impl, Cold: ItemStore> HotColdDB )?; // 3. Replay the blocks on top of the low restore point. - self.replay_blocks(low_restore_point, blocks, slot) + self.replay_blocks(low_restore_point, blocks, slot, BlockReplay::Accurate) } /// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`. @@ -624,9 +665,19 @@ impl, Cold: ItemStore> HotColdDB fn replay_blocks( &self, mut state: BeaconState, - blocks: Vec>, + mut blocks: Vec>, target_slot: Slot, + block_replay: BlockReplay, ) -> Result, Error> { + if block_replay == BlockReplay::InconsistentStateRoots { + for i in 0..blocks.len() { + blocks[i].message.state_root = Hash256::zero(); + if i > 0 { + blocks[i].message.parent_root = blocks[i - 1].canonical_root() + } + } + } + let state_root_from_prev_block = |i: usize, state: &BeaconState| { if i > 0 { let prev_block = &blocks[i - 1].message; @@ -646,10 +697,14 @@ impl, Cold: ItemStore> HotColdDB } while state.slot < block.message.slot { - let state_root = state_root_from_prev_block(i, &state); + let state_root = match block_replay { + BlockReplay::Accurate => state_root_from_prev_block(i, &state), + BlockReplay::InconsistentStateRoots => Some(Hash256::zero()), + }; per_slot_processing(&mut state, state_root, &self.spec) .map_err(HotColdDBError::BlockReplaySlotError)?; } + per_block_processing( &mut state, &block, @@ -661,7 +716,10 @@ impl, Cold: ItemStore> HotColdDB } while state.slot < target_slot { - let state_root = state_root_from_prev_block(blocks.len(), &state); + let state_root = match block_replay { + BlockReplay::Accurate => state_root_from_prev_block(blocks.len(), &state), + BlockReplay::InconsistentStateRoots => Some(Hash256::zero()), + }; per_slot_processing(&mut state, state_root, &self.spec) .map_err(HotColdDBError::BlockReplaySlotError)?; } diff --git a/consensus/types/src/beacon_state/tree_hash_cache.rs b/consensus/types/src/beacon_state/tree_hash_cache.rs index 0c8899c0255..6abc795a137 100644 --- a/consensus/types/src/beacon_state/tree_hash_cache.rs +++ b/consensus/types/src/beacon_state/tree_hash_cache.rs @@ -7,6 +7,7 @@ use rayon::prelude::*; use ssz_derive::{Decode, Encode}; use ssz_types::VariableList; use std::cmp::Ordering; +use std::iter::ExactSizeIterator; use tree_hash::{mix_in_length, MerkleHasher, TreeHash}; /// The number of fields on a beacon state. @@ -288,17 +289,17 @@ impl ValidatorsListTreeHashCache { fn recalculate_tree_hash_root(&mut self, validators: &[Validator]) -> Result { let mut list_arena = std::mem::take(&mut self.list_arena); - let leaves = self - .values - .leaves(validators)? - .into_iter() - .flatten() - .map(|h| h.to_fixed_bytes()) - .collect::>(); + let leaves = self.values.leaves(validators)?; + let num_leaves = leaves.iter().map(|arena| arena.len()).sum(); + + let leaves_iter = ForcedExactSizeIterator { + iter: leaves.into_iter().flatten().map(|h| h.to_fixed_bytes()), + len: num_leaves, + }; let list_root = self .list_cache - .recalculate_merkle_root(&mut list_arena, leaves.into_iter())?; + .recalculate_merkle_root(&mut list_arena, leaves_iter)?; self.list_arena = list_arena; @@ -306,6 +307,29 @@ impl ValidatorsListTreeHashCache { } } +/// Provides a wrapper around some `iter` if the number of items in the iterator is known to the +/// programmer but not the compiler. This allows use of `ExactSizeIterator` in some occasions. +/// +/// Care should be taken to ensure `len` is accurate. +struct ForcedExactSizeIterator { + iter: I, + len: usize, +} + +impl> Iterator for ForcedExactSizeIterator { + type Item = V; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +impl> ExactSizeIterator for ForcedExactSizeIterator { + fn len(&self) -> usize { + self.len + } +} + /// Provides a cache for each of the `Validator` objects in `state.validators` and computes the /// roots of these using Rayon parallelization. #[derive(Debug, PartialEq, Clone, Default, Encode, Decode)] From f85485884f0453dd323f2eb9d0416bc05352a9ba Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Aug 2020 09:20:27 +0000 Subject: [PATCH 2/6] Process gossip blocks on the GossipProcessor (#1523) ## Issue Addressed NA ## Proposed Changes Moves beacon block processing over to the newly-added `GossipProcessor`. This moves the task off the core executor onto the blocking one. ## Additional Info - With this PR, gossip blocks are being ignored during sync. --- .../chain_segment.rs} | 200 +++--- .../mod.rs} | 610 ++++++++++++++---- beacon_node/network/src/lib.rs | 1 + beacon_node/network/src/metrics.rs | 115 ++-- beacon_node/network/src/router/mod.rs | 21 +- beacon_node/network/src/router/processor.rs | 121 +--- beacon_node/network/src/sync/manager.rs | 106 ++- beacon_node/network/src/sync/mod.rs | 4 +- .../network/src/sync/range_sync/chain.rs | 41 +- .../src/sync/range_sync/chain_collection.rs | 10 +- .../network/src/sync/range_sync/range.rs | 17 +- common/lighthouse_metrics/src/lib.rs | 57 +- 12 files changed, 847 insertions(+), 456 deletions(-) rename beacon_node/network/src/{sync/block_processor.rs => beacon_processor/chain_segment.rs} (52%) rename beacon_node/network/src/{router/gossip_processor.rs => beacon_processor/mod.rs} (59%) diff --git a/beacon_node/network/src/sync/block_processor.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs similarity index 52% rename from beacon_node/network/src/sync/block_processor.rs rename to beacon_node/network/src/beacon_processor/chain_segment.rs index a4fe5b418ff..5b034e4aa44 100644 --- a/beacon_node/network/src/sync/block_processor.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -1,10 +1,11 @@ +use crate::metrics; use crate::router::processor::FUTURE_SLOT_TOLERANCE; use crate::sync::manager::SyncMessage; -use crate::sync::range_sync::{BatchId, ChainId}; +use crate::sync::{BatchId, BatchProcessResult, ChainId}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, ChainSegmentResult}; use eth2_libp2p::PeerId; use slog::{debug, error, trace, warn}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use tokio::sync::mpsc; use types::{EthSpec, SignedBeaconBlock}; @@ -17,85 +18,71 @@ pub enum ProcessId { ParentLookup(PeerId), } -/// The result of a block processing request. -// TODO: When correct batch error handling occurs, we will include an error type. -#[derive(Debug)] -pub enum BatchProcessResult { - /// The batch was completed successfully. - Success, - /// The batch processing failed. - Failed, - /// The batch processing failed but managed to import at least one block. - Partial, -} - -/// Spawns a thread handling the block processing of a request: range syncing or parent lookup. -pub fn spawn_block_processor( - chain: Weak>, +pub fn handle_chain_segment( + chain: Arc>, process_id: ProcessId, downloaded_blocks: Vec>, sync_send: mpsc::UnboundedSender>, log: slog::Logger, ) { - std::thread::spawn(move || { - match process_id { - // this a request from the range sync - ProcessId::RangeBatchId(chain_id, batch_id) => { - let len = downloaded_blocks.len(); - let start_slot = if len > 0 { - downloaded_blocks[0].message.slot.as_u64() - } else { - 0 - }; - let end_slot = if len > 0 { - downloaded_blocks[len - 1].message.slot.as_u64() - } else { - 0 - }; - - debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot); - let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { - (_, Ok(_)) => { - debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot); - BatchProcessResult::Success - } - (imported_blocks, Err(e)) if imported_blocks > 0 => { - debug!(log, "Batch processing failed but imported some blocks"; + match process_id { + // this a request from the range sync + ProcessId::RangeBatchId(chain_id, batch_id) => { + let len = downloaded_blocks.len(); + let start_slot = if len > 0 { + downloaded_blocks[0].message.slot.as_u64() + } else { + 0 + }; + let end_slot = if len > 0 { + downloaded_blocks[len - 1].message.slot.as_u64() + } else { + 0 + }; + + debug!(log, "Processing batch"; "id" => *batch_id, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot); + let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { + (_, Ok(_)) => { + debug!(log, "Batch processed"; "id" => *batch_id , "start_slot" => start_slot, "end_slot" => end_slot); + BatchProcessResult::Success + } + (imported_blocks, Err(e)) if imported_blocks > 0 => { + debug!(log, "Batch processing failed but imported some blocks"; "id" => *batch_id, "error" => e, "imported_blocks"=> imported_blocks); - BatchProcessResult::Partial - } - (_, Err(e)) => { - debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e); - BatchProcessResult::Failed - } - }; - - let msg = SyncMessage::BatchProcessed { - chain_id, - batch_id, - downloaded_blocks, - result, - }; - sync_send.send(msg).unwrap_or_else(|_| { - debug!( - log, - "Block processor could not inform range sync result. Likely shutting down." - ); - }); - } - // this a parent lookup request from the sync manager - ProcessId::ParentLookup(peer_id) => { + BatchProcessResult::Partial + } + (_, Err(e)) => { + debug!(log, "Batch processing failed"; "id" => *batch_id, "error" => e); + BatchProcessResult::Failed + } + }; + + let msg = SyncMessage::BatchProcessed { + chain_id, + batch_id, + downloaded_blocks, + result, + }; + sync_send.send(msg).unwrap_or_else(|_| { debug!( - log, "Processing parent lookup"; - "last_peer_id" => format!("{}", peer_id), - "blocks" => downloaded_blocks.len() + log, + "Block processor could not inform range sync result. Likely shutting down." ); - // parent blocks are ordered from highest slot to lowest, so we need to process in - // reverse - match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { - (_, Err(e)) => { - warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); - sync_send + }); + } + // this a parent lookup request from the sync manager + ProcessId::ParentLookup(peer_id) => { + debug!( + log, "Processing parent lookup"; + "last_peer_id" => format!("{}", peer_id), + "blocks" => downloaded_blocks.len() + ); + // parent blocks are ordered from highest slot to lowest, so we need to process in + // reverse + match process_blocks(chain, downloaded_blocks.iter().rev(), &log) { + (_, Err(e)) => { + warn!(log, "Parent lookup failed"; "last_peer_id" => format!("{}", peer_id), "error" => e); + sync_send .send(SyncMessage::ParentLookupFailed(peer_id)) .unwrap_or_else(|_| { // on failure, inform to downvote the peer @@ -104,14 +91,13 @@ pub fn spawn_block_processor( "Block processor could not inform parent lookup result. Likely shutting down." ); }); - } - (_, Ok(_)) => { - debug!(log, "Parent lookup processed successfully"); - } + } + (_, Ok(_)) => { + debug!(log, "Parent lookup processed successfully"); } } } - }); + } } /// Helper function to process blocks batches which only consumes the chain and blocks to process. @@ -120,43 +106,39 @@ fn process_blocks< T: BeaconChainTypes, I: Iterator>, >( - chain: Weak>, + chain: Arc>, downloaded_blocks: I, log: &slog::Logger, ) -> (usize, Result<(), String>) { - if let Some(chain) = chain.upgrade() { - let blocks = downloaded_blocks.cloned().collect::>(); - let (imported_blocks, r) = match chain.process_chain_segment(blocks) { - ChainSegmentResult::Successful { imported_blocks } => { - if imported_blocks == 0 { - debug!(log, "All blocks already known"); - } else { - debug!( - log, "Imported blocks from network"; - "count" => imported_blocks, - ); - // Batch completed successfully with at least one block, run fork choice. - run_fork_choice(chain, log); - } - - (imported_blocks, Ok(())) - } - ChainSegmentResult::Failed { - imported_blocks, - error, - } => { - let r = handle_failed_chain_segment(error, log); - if imported_blocks > 0 { - run_fork_choice(chain, log); - } - (imported_blocks, r) + let blocks = downloaded_blocks.cloned().collect::>(); + match chain.process_chain_segment(blocks) { + ChainSegmentResult::Successful { imported_blocks } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); + if imported_blocks == 0 { + debug!(log, "All blocks already known"); + } else { + debug!( + log, "Imported blocks from network"; + "count" => imported_blocks, + ); + // Batch completed successfully with at least one block, run fork choice. + run_fork_choice(chain, log); } - }; - return (imported_blocks, r); + (imported_blocks, Ok(())) + } + ChainSegmentResult::Failed { + imported_blocks, + error, + } => { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); + let r = handle_failed_chain_segment(error, log); + if imported_blocks > 0 { + run_fork_choice(chain, log); + } + (imported_blocks, r) + } } - - (0, Ok(())) } /// Runs fork-choice on a given chain. This is used during block processing after one successful diff --git a/beacon_node/network/src/router/gossip_processor.rs b/beacon_node/network/src/beacon_processor/mod.rs similarity index 59% rename from beacon_node/network/src/router/gossip_processor.rs rename to beacon_node/network/src/beacon_processor/mod.rs index f9f229ba77d..33022c0ad7d 100644 --- a/beacon_node/network/src/router/gossip_processor.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1,4 +1,4 @@ -//! Provides the `GossipProcessor`, a mutli-threaded processor for messages received on the network +//! Provides the `BeaconProcessor`, a mutli-threaded processor for messages received on the network //! that need to be processed by the `BeaconChain`. //! //! Uses `tokio` tasks (instead of raw threads) to provide the following tasks: @@ -8,7 +8,7 @@ //! //! ## Purpose //! -//! The purpose of the `GossipProcessor` is to provide two things: +//! The purpose of the `BeaconProcessor` is to provide two things: //! //! 1. Moving long-running, blocking tasks off the main `tokio` executor. //! 2. A fixed-length buffer for consensus messages. @@ -38,23 +38,29 @@ use crate::{metrics, service::NetworkMessage, sync::SyncMessage}; use beacon_chain::{ attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, - ForkChoiceError, + BlockError, ForkChoiceError, }; +use chain_segment::handle_chain_segment; use environment::TaskExecutor; use eth2_libp2p::{MessageId, NetworkGlobals, PeerId}; -use slog::{crit, debug, error, trace, warn, Logger}; +use slog::{crit, debug, error, info, trace, warn, Logger}; +use ssz::Encode; use std::collections::VecDeque; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use tokio::sync::mpsc; -use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; +use tokio::sync::{mpsc, oneshot}; +use types::{Attestation, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, SubnetId}; -/// The maximum size of the channel for work events to the `GossipProcessor`. +mod chain_segment; + +pub use chain_segment::ProcessId; + +/// The maximum size of the channel for work events to the `BeaconProcessor`. /// /// Setting this too low will cause consensus messages to be dropped. -const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; +pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384; -/// The maximum size of the channel for idle events to the `GossipProcessor`. +/// The maximum size of the channel for idle events to the `BeaconProcessor`. /// /// Setting this too low will prevent new workers from being spawned. It *should* only need to be /// set to the CPU count, but we set it high to be safe. @@ -68,6 +74,18 @@ const MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN: usize = 16_384; /// start dropping them. const MAX_AGGREGATED_ATTESTATION_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `SignedBeaconBlock` objects received on gossip that will be stored +/// before we start dropping them. +const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `SignedBeaconBlock` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; + +/// The maximum number of queued `Vec` objects received during syncing that will +/// be stored before we start dropping them. +const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; + /// The name of the manager tokio task. const MANAGER_TASK_NAME: &str = "beacon_gossip_processor_manager"; /// The name of the worker tokio tasks. @@ -76,16 +94,56 @@ const WORKER_TASK_NAME: &str = "beacon_gossip_processor_worker"; /// The minimum interval between log messages indicating that a queue is full. const LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); -/// A queued item from gossip, awaiting processing. -struct QueueItem { - message_id: MessageId, - peer_id: PeerId, - item: T, +/// Used to send/receive results from a rpc block import in a blocking task. +pub type BlockResultSender = oneshot::Sender>>; +pub type BlockResultReceiver = oneshot::Receiver>>; + +/// A simple first-in-first-out queue with a maximum length. +struct FifoQueue { + queue: VecDeque, + max_length: usize, +} + +impl FifoQueue { + /// Create a new, empty queue with the given length. + pub fn new(max_length: usize) -> Self { + Self { + queue: VecDeque::default(), + max_length, + } + } + + /// Add a new item to the queue. + /// + /// Drops `item` if the queue is full. + pub fn push(&mut self, item: T, item_desc: &str, log: &Logger) { + if self.queue.len() == self.max_length { + error!( + log, + "Block queue full"; + "msg" => "the system has insufficient resources for load", + "queue_len" => self.max_length, + "queue" => item_desc, + ) + } else { + self.queue.push_back(item); + } + } + + /// Remove the next item from the queue. + pub fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + /// Returns the current length of the queue. + pub fn len(&self) -> usize { + self.queue.len() + } } /// A simple last-in-first-out queue with a maximum length. struct LifoQueue { - queue: VecDeque>, + queue: VecDeque, max_length: usize, } @@ -98,8 +156,10 @@ impl LifoQueue { } } - /// Add a new item to the queue. - pub fn push(&mut self, item: QueueItem) { + /// Add a new item to the front of the queue. + /// + /// If the queue is full, the item at the back of the queue is dropped. + pub fn push(&mut self, item: T) { if self.queue.len() == self.max_length { self.queue.pop_back(); } @@ -107,7 +167,7 @@ impl LifoQueue { } /// Remove the next item from the queue. - pub fn pop(&mut self) -> Option> { + pub fn pop(&mut self) -> Option { self.queue.pop_front() } @@ -123,10 +183,9 @@ impl LifoQueue { } /// An event to be processed by the manager task. -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub struct WorkEvent { - message_id: MessageId, - peer_id: PeerId, + drop_during_sync: bool, work: Work, } @@ -140,9 +199,14 @@ impl WorkEvent { should_import: bool, ) -> Self { Self { - message_id, - peer_id, - work: Work::Attestation(Box::new((attestation, subnet_id, should_import))), + drop_during_sync: true, + work: Work::GossipAttestation { + message_id, + peer_id, + attestation: Box::new(attestation), + subnet_id, + should_import, + }, } } @@ -153,18 +217,92 @@ impl WorkEvent { aggregate: SignedAggregateAndProof, ) -> Self { Self { - message_id, - peer_id, - work: Work::Aggregate(Box::new(aggregate)), + drop_during_sync: true, + work: Work::GossipAggregate { + message_id, + peer_id, + aggregate: Box::new(aggregate), + }, + } + } + + /// Create a new `Work` event for some block. + pub fn gossip_beacon_block( + message_id: MessageId, + peer_id: PeerId, + block: Box>, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::GossipBlock { + message_id, + peer_id, + block, + }, + } + } + + /// Create a new `Work` event for some block, where the result from computation (if any) is + /// sent to the other side of `result_tx`. + pub fn rpc_beacon_block(block: Box>) -> (Self, BlockResultReceiver) { + let (result_tx, result_rx) = oneshot::channel(); + let event = Self { + drop_during_sync: false, + work: Work::RpcBlock { block, result_tx }, + }; + (event, result_rx) + } + + /// Create a new work event to import `blocks` as a beacon chain segment. + pub fn chain_segment(process_id: ProcessId, blocks: Vec>) -> Self { + Self { + drop_during_sync: false, + work: Work::ChainSegment { process_id, blocks }, } } } -/// A consensus message from gossip which requires processing. -#[derive(Debug, PartialEq)] +/// A consensus message (or multiple) from the network that requires processing. +#[derive(Debug)] pub enum Work { - Attestation(Box<(Attestation, SubnetId, bool)>), - Aggregate(Box>), + GossipAttestation { + message_id: MessageId, + peer_id: PeerId, + attestation: Box>, + subnet_id: SubnetId, + should_import: bool, + }, + GossipAggregate { + message_id: MessageId, + peer_id: PeerId, + aggregate: Box>, + }, + GossipBlock { + message_id: MessageId, + peer_id: PeerId, + block: Box>, + }, + RpcBlock { + block: Box>, + result_tx: BlockResultSender, + }, + ChainSegment { + process_id: ProcessId, + blocks: Vec>, + }, +} + +impl Work { + /// Provides a `&str` that uniquely identifies each enum variant. + fn str_id(&self) -> &'static str { + match self { + Work::GossipAttestation { .. } => "gossip_attestation", + Work::GossipAggregate { .. } => "gossip_aggregate", + Work::GossipBlock { .. } => "gossip_block", + Work::RpcBlock { .. } => "rpc_block", + Work::ChainSegment { .. } => "chain_segment", + } + } } /// Provides de-bounce functionality for logging. @@ -190,8 +328,8 @@ impl TimeLatch { /// that need to be processed by the `BeaconChain` /// /// See module level documentation for more information. -pub struct GossipProcessor { - pub beacon_chain: Arc>, +pub struct BeaconProcessor { + pub beacon_chain: Weak>, pub network_tx: mpsc::UnboundedSender>, pub sync_tx: mpsc::UnboundedSender>, pub network_globals: Arc>, @@ -201,7 +339,7 @@ pub struct GossipProcessor { pub log: Logger, } -impl GossipProcessor { +impl BeaconProcessor { /// Spawns the "manager" task which checks the receiver end of the returned `Sender` for /// messages which contain some new work which will be: /// @@ -210,9 +348,7 @@ impl GossipProcessor { /// /// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task /// started with `spawn_blocking`. - pub fn spawn_manager(mut self) -> mpsc::Sender> { - let (event_tx, mut event_rx) = - mpsc::channel::>(MAX_WORK_EVENT_QUEUE_LEN); + pub fn spawn_manager(mut self, mut event_rx: mpsc::Receiver>) { let (idle_tx, mut idle_rx) = mpsc::channel::<()>(MAX_IDLE_QUEUE_LEN); let mut aggregate_queue = LifoQueue::new(MAX_AGGREGATED_ATTESTATION_QUEUE_LEN); @@ -221,6 +357,12 @@ impl GossipProcessor { let mut attestation_queue = LifoQueue::new(MAX_UNAGGREGATED_ATTESTATION_QUEUE_LEN); let mut attestation_debounce = TimeLatch::default(); + let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); + + let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); + + let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); + let executor = self.executor.clone(); // The manager future will run on the non-blocking executor and delegate tasks to worker @@ -236,7 +378,6 @@ impl GossipProcessor { // A worker has finished some work. new_idle_opt = idle_rx.recv() => { if new_idle_opt.is_some() { - metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL); self.current_workers = self.current_workers.saturating_sub(1); None } else { @@ -254,7 +395,6 @@ impl GossipProcessor { // There is a new piece of work to be handled. new_work_event_opt = event_rx.recv() => { if let Some(new_work_event) = new_work_event_opt { - metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL); Some(new_work_event) } else { // Exit if all event senders have been dropped. @@ -271,30 +411,47 @@ impl GossipProcessor { }; let _event_timer = - metrics::start_timer(&metrics::GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS); + metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS); + if let Some(event) = &work_event { + metrics::inc_counter_vec( + &metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT, + &[event.work.str_id()], + ); + } else { + metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL); + } let can_spawn = self.current_workers < self.max_workers; + let drop_during_sync = work_event + .as_ref() + .map_or(false, |event| event.drop_during_sync); match work_event { // There is no new work event, but we are able to spawn a new worker. + // + // We don't check the `work.drop_during_sync` here. We assume that if it made + // it into the queue at any point then we should process it. None if can_spawn => { - // Check the aggregates, *then* the unaggregates since we assume that - // aggregates are more valuable to local validators and effectively - // give us more information with less signature verification time. - if let Some(item) = aggregate_queue.pop() { - self.spawn_worker( - idle_tx.clone(), - item.message_id, - item.peer_id, - Work::Aggregate(item.item), - ); + // Check for chain segments first, they're the most efficient way to get + // blocks into the system. + if let Some(item) = chain_segment_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + // Check sync blocks before gossip blocks, since we've already explicitly + // requested these blocks. + } else if let Some(item) = rpc_block_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + // Check gossip blocks before gossip attestations, since a block might be + // required to verify some attestations. + } else if let Some(item) = gossip_block_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); + // Check the aggregates, *then* the unaggregates + // since we assume that aggregates are more valuable to local validators + // and effectively give us more information with less signature + // verification time. + } else if let Some(item) = aggregate_queue.pop() { + self.spawn_worker(idle_tx.clone(), item); } else if let Some(item) = attestation_queue.pop() { - self.spawn_worker( - idle_tx.clone(), - item.message_id, - item.peer_id, - Work::Attestation(item.item), - ); + self.spawn_worker(idle_tx.clone(), item); } } // There is no new work event and we are unable to spawn a new worker. @@ -307,54 +464,65 @@ impl GossipProcessor { "msg" => "no new work and cannot spawn worker" ); } - // There is a new work event, but the chain is syncing. Ignore it. - Some(WorkEvent { .. }) - if self.network_globals.sync_state.read().is_syncing() => + // The chain is syncing and this event should be dropped during sync. + Some(work_event) + if self.network_globals.sync_state.read().is_syncing() + && drop_during_sync => { - metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL); + let work_id = work_event.work.str_id(); + metrics::inc_counter_vec( + &metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT, + &[work_id], + ); trace!( self.log, "Gossip processor skipping work"; - "msg" => "chain is syncing" + "msg" => "chain is syncing", + "work_id" => work_id ); } // There is a new work event and the chain is not syncing. Process it. - Some(WorkEvent { - message_id, - peer_id, - work, - }) => match work { - Work::Attestation(_) if can_spawn => { - self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) - } - Work::Attestation(attestation) => attestation_queue.push(QueueItem { - message_id, - peer_id, - item: attestation, - }), - Work::Aggregate(_) if can_spawn => { - self.spawn_worker(idle_tx.clone(), message_id, peer_id, work) + Some(WorkEvent { work, .. }) => { + let work_id = work.str_id(); + match work { + _ if can_spawn => self.spawn_worker(idle_tx.clone(), work), + Work::GossipAttestation { .. } => attestation_queue.push(work), + Work::GossipAggregate { .. } => aggregate_queue.push(work), + Work::GossipBlock { .. } => { + gossip_block_queue.push(work, work_id, &self.log) + } + Work::RpcBlock { .. } => rpc_block_queue.push(work, work_id, &self.log), + Work::ChainSegment { .. } => { + chain_segment_queue.push(work, work_id, &self.log) + } } - Work::Aggregate(aggregate) => aggregate_queue.push(QueueItem { - message_id, - peer_id, - item: aggregate, - }), - }, + } } metrics::set_gauge( - &metrics::GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL, + &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL, self.current_workers as i64, ); metrics::set_gauge( - &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL, + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL, attestation_queue.len() as i64, ); metrics::set_gauge( - &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL, + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL, aggregate_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL, + gossip_block_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, + rpc_block_queue.len() as i64, + ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL, + chain_segment_queue.len() as i64, + ); if aggregate_queue.is_full() && aggregate_debounce.elapsed() { error!( @@ -378,62 +546,77 @@ impl GossipProcessor { // Spawn on the non-blocking executor. executor.spawn(manager_future, MANAGER_TASK_NAME); - - event_tx } /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. - fn spawn_worker( - &mut self, - mut idle_tx: mpsc::Sender<()>, - message_id: MessageId, - peer_id: PeerId, - work: Work, - ) { - let worker_timer = metrics::start_timer(&metrics::GOSSIP_PROCESSOR_WORKER_TIME); - metrics::inc_counter(&metrics::GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL); + fn spawn_worker(&mut self, mut idle_tx: mpsc::Sender<()>, work: Work) { + let work_id = work.str_id(); + let worker_timer = + metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]); + metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL); + metrics::inc_counter_vec( + &metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT, + &[work.str_id()], + ); + let worker_id = self.current_workers; self.current_workers = self.current_workers.saturating_add(1); - let chain = self.beacon_chain.clone(); + + let chain = if let Some(chain) = self.beacon_chain.upgrade() { + chain + } else { + debug!( + self.log, + "Beacon chain dropped, shutting down"; + ); + return; + }; + let network_tx = self.network_tx.clone(); let sync_tx = self.sync_tx.clone(); let log = self.log.clone(); let executor = self.executor.clone(); + trace!( + self.log, + "Spawning beacon processor worker"; + "work" => work_id, + "worker" => worker_id, + ); + executor.spawn_blocking( move || { let _worker_timer = worker_timer; + let inner_log = log.clone(); // We use this closure pattern to avoid using a `return` that prevents the // `idle_tx` message from sending. let handler = || { + let log = inner_log.clone(); match work { /* * Unaggregated attestation verification. */ - Work::Attestation(boxed_tuple) => { - let (attestation, subnet_id, should_import) = *boxed_tuple; - - let _attestation_timer = metrics::start_timer( - &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME, - ); - metrics::inc_counter( - &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - + Work::GossipAttestation { + message_id, + peer_id, + attestation, + subnet_id, + should_import, + } => { let beacon_block_root = attestation.data.beacon_block_root; let attestation = match chain - .verify_unaggregated_attestation_for_gossip(attestation, subnet_id) + .verify_unaggregated_attestation_for_gossip(*attestation, subnet_id) { Ok(attestation) => attestation, Err(e) => { handle_attestation_verification_failure( &log, sync_tx, - peer_id.clone(), + peer_id, beacon_block_root, "unaggregated", e, @@ -451,7 +634,7 @@ impl GossipProcessor { } metrics::inc_counter( - &metrics::GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL, ); if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) { @@ -484,44 +667,44 @@ impl GossipProcessor { "beacon_block_root" => format!("{:?}", beacon_block_root) ) } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); } /* * Aggregated attestation verification. */ - Work::Aggregate(boxed_aggregate) => { - let _attestation_timer = metrics::start_timer( - &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME, - ); - metrics::inc_counter( - &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, - ); - + Work::GossipAggregate { + message_id, + peer_id, + aggregate, + } => { let beacon_block_root = - boxed_aggregate.message.aggregate.data.beacon_block_root; + aggregate.message.aggregate.data.beacon_block_root; - let aggregate = match chain - .verify_aggregated_attestation_for_gossip(*boxed_aggregate) - { - Ok(aggregate) => aggregate, - Err(e) => { - handle_attestation_verification_failure( - &log, - sync_tx, - peer_id.clone(), - beacon_block_root, - "aggregated", - e, - ); - return; - } - }; + let aggregate = + match chain.verify_aggregated_attestation_for_gossip(*aggregate) { + Ok(aggregate) => aggregate, + Err(e) => { + handle_attestation_verification_failure( + &log, + sync_tx, + peer_id, + beacon_block_root, + "aggregated", + e, + ); + return; + } + }; // Indicate to the `Network` service that this message is valid and can be // propagated on the gossip network. propagate_gossip_message(network_tx, message_id, peer_id.clone(), &log); metrics::inc_counter( - &metrics::GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL, ); if let Err(e) = chain.apply_attestation_to_fork_choice(&aggregate) { @@ -554,11 +737,151 @@ impl GossipProcessor { "beacon_block_root" => format!("{:?}", beacon_block_root) ) } + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL, + ); + } + /* + * Verification for beacon blocks received on gossip. + */ + Work::GossipBlock { + message_id, + peer_id, + block, + } => { + let verified_block = match chain.verify_block_for_gossip(*block) { + Ok(verified_block) => { + info!( + log, + "New block received"; + "slot" => verified_block.block.slot(), + "hash" => verified_block.block_root.to_string() + ); + propagate_gossip_message( + network_tx, + message_id, + peer_id.clone(), + &log, + ); + verified_block + } + Err(BlockError::ParentUnknown(block)) => { + send_sync_message( + sync_tx, + SyncMessage::UnknownBlock(peer_id, block), + &log, + ); + return; + } + Err(e) => { + warn!( + log, + "Could not verify block for gossip"; + "error" => format!("{:?}", e) + ); + return; + } + }; + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL, + ); + + let block = Box::new(verified_block.block.clone()); + match chain.process_block(verified_block) { + Ok(_block_root) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, + ); + + trace!( + log, + "Gossipsub block processed"; + "peer_id" => peer_id.to_string() + ); + + // TODO: It would be better if we can run this _after_ we publish the block to + // reduce block propagation latency. + // + // The `MessageHandler` would be the place to put this, however it doesn't seem + // to have a reference to the `BeaconChain`. I will leave this for future + // works. + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "block gossip" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "block gossip" + ), + } + } + Err(BlockError::ParentUnknown { .. }) => { + // Inform the sync manager to find parents for this block + // This should not occur. It should be checked by `should_forward_block` + error!( + log, + "Block with unknown parent attempted to be processed"; + "peer_id" => peer_id.to_string() + ); + send_sync_message( + sync_tx, + SyncMessage::UnknownBlock(peer_id, block), + &log, + ); + } + other => { + debug!( + log, + "Invalid gossip beacon block"; + "outcome" => format!("{:?}", other), + "block root" => format!("{}", block.canonical_root()), + "block slot" => block.slot() + ); + trace!( + log, + "Invalid gossip beacon block ssz"; + "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), + ); + } + }; + } + /* + * Verification for beacon blocks received during syncing via RPC. + */ + Work::RpcBlock { block, result_tx } => { + let block_result = chain.process_block(*block); + + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL, + ); + + if result_tx.send(block_result).is_err() { + crit!(log, "Failed return sync block result"); + } + } + /* + * Verification for a chain segment (multiple blocks). + */ + Work::ChainSegment { process_id, blocks } => { + handle_chain_segment(chain, process_id, blocks, sync_tx, log) } }; }; handler(); + trace!( + log, + "Beacon processor worker done"; + "work" => work_id, + "worker" => worker_id, + ); + idle_tx.try_send(()).unwrap_or_else(|e| { crit!( log, @@ -596,6 +919,19 @@ fn propagate_gossip_message( }); } +/// Send a message to `sync_tx`. +/// +/// Creates a log if there is an interal error. +fn send_sync_message( + sync_tx: mpsc::UnboundedSender>, + message: SyncMessage, + log: &Logger, +) { + sync_tx + .send(message) + .unwrap_or_else(|_| error!(log, "Could not send message to the sync service")); +} + /// Handle an error whilst verifying an `Attestation` or `SignedAggregateAndProof` from the /// network. pub fn handle_attestation_verification_failure( diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index f7cc8051c8a..30795a63ef3 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -6,6 +6,7 @@ pub mod error; pub mod service; mod attestation_service; +mod beacon_processor; mod metrics; mod persisted_dht; mod router; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b0039763c3d..fa17d5f91fd 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -49,64 +49,101 @@ lazy_static! { /* * Gossip processor */ - pub static ref GOSSIP_PROCESSOR_WORKERS_SPAWNED_TOTAL: Result = try_create_int_counter( - "gossip_processor_workers_spawned_total", + pub static ref BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT: Result = try_create_int_counter_vec( + "beacon_processor_work_events_rx_count", + "Count of work events received (but not necessarily processed)", + &["type"] + ); + pub static ref BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT: Result = try_create_int_counter_vec( + "beacon_processor_work_events_ignored_count", + "Count of work events purposefully ignored", + &["type"] + ); + pub static ref BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT: Result = try_create_int_counter_vec( + "beacon_processor_work_events_started_count", + "Count of work events which have been started by a worker", + &["type"] + ); + pub static ref BEACON_PROCESSOR_WORKER_TIME: Result = try_create_histogram_vec( + "beacon_processor_worker_time", + "Time taken for a worker to fully process some parcel of work.", + &["type"] + ); + pub static ref BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL: Result = try_create_int_counter( + "beacon_processor_workers_spawned_total", "The number of workers ever spawned by the gossip processing pool." ); - pub static ref GOSSIP_PROCESSOR_WORKERS_ACTIVE_TOTAL: Result = try_create_int_gauge( - "gossip_processor_workers_active_total", + pub static ref BEACON_PROCESSOR_WORKERS_ACTIVE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_workers_active_total", "Count of active workers in the gossip processing pool." ); - pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_TOTAL: Result = try_create_int_counter( - "gossip_processor_work_events_total", - "Count of work events processed by the gossip processor manager." - ); - pub static ref GOSSIP_PROCESSOR_WORK_EVENTS_IGNORED_TOTAL: Result = try_create_int_counter( - "gossip_processor_work_events_ignored_total", - "Count of work events processed by the gossip processor manager." - ); - pub static ref GOSSIP_PROCESSOR_IDLE_EVENTS_TOTAL: Result = try_create_int_counter( - "gossip_processor_idle_events_total", + pub static ref BEACON_PROCESSOR_IDLE_EVENTS_TOTAL: Result = try_create_int_counter( + "beacon_processor_idle_events_total", "Count of idle events processed by the gossip processor manager." ); - pub static ref GOSSIP_PROCESSOR_EVENT_HANDLING_SECONDS: Result = try_create_histogram( - "gossip_processor_event_handling_seconds", + pub static ref BEACON_PROCESSOR_EVENT_HANDLING_SECONDS: Result = try_create_histogram( + "beacon_processor_event_handling_seconds", "Time spent handling a new message and allocating it to a queue or worker." ); - pub static ref GOSSIP_PROCESSOR_WORKER_TIME: Result = try_create_histogram( - "gossip_processor_worker_time", - "Time taken for a worker to fully process some parcel of work." + // Gossip blocks. + pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_gossip_block_queue_total", + "Count of blocks from gossip waiting to be verified." ); - pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( - "gossip_processor_unaggregated_attestation_queue_total", - "Count of unagg. attestations waiting to be processed." + pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_block_verified_total", + "Total number of gossip blocks verified for propagation." + ); + pub static ref BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_block_imported_total", + "Total number of gossip blocks imported to fork choice, etc." + ); + // Rpc blocks. + pub static ref BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_rpc_block_queue_total", + "Count of blocks from the rpc waiting to be verified." + ); + pub static ref BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_rpc_block_imported_total", + "Total number of gossip blocks imported to fork choice, etc." ); - pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_WORKER_TIME: Result = try_create_histogram( - "gossip_processor_unaggregated_attestation_worker_time", - "Time taken for a worker to fully process an unaggregated attestation." + // Chain segments. + pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_chain_segment_queue_total", + "Count of chain segments from the rpc waiting to be verified." ); - pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( - "gossip_processor_unaggregated_attestation_verified_total", + pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: Result = try_create_int_counter( + "beacon_processor_chain_segment_success_total", + "Total number of chain segments successfully processed." + ); + pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL: Result = try_create_int_counter( + "beacon_processor_chain_segment_failed_total", + "Total number of chain segments that failed processing." + ); + // Unaggregated attestations. + pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_unaggregated_attestation_queue_total", + "Count of unagg. attestations waiting to be processed." + ); + pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_unaggregated_attestation_verified_total", "Total number of unaggregated attestations verified for gossip." ); - pub static ref GOSSIP_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( - "gossip_processor_unaggregated_attestation_imported_total", + pub static ref BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_unaggregated_attestation_imported_total", "Total number of unaggregated attestations imported to fork choice, etc." ); - pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( - "gossip_processor_aggregated_attestation_queue_total", + // Aggregated attestations. + pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_aggregated_attestation_queue_total", "Count of agg. attestations waiting to be processed." ); - pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_WORKER_TIME: Result = try_create_histogram( - "gossip_processor_aggregated_attestation_worker_time", - "Time taken for a worker to fully process an aggregated attestation." - ); - pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( - "gossip_processor_aggregated_attestation_verified_total", + pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_aggregated_attestation_verified_total", "Total number of aggregated attestations verified for gossip." ); - pub static ref GOSSIP_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( - "gossip_processor_aggregated_attestation_imported_total", + pub static ref BEACON_PROCESSOR_AGGREGATED_ATTESTATION_IMPORTED_TOTAL: Result = try_create_int_counter( + "beacon_processor_aggregated_attestation_imported_total", "Total number of aggregated attestations imported to fork choice, etc." ); diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index bcb02fbbee2..c3a729ce712 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -5,19 +5,18 @@ //! syncing-related responses to the Sync manager. #![allow(clippy::unit_arg)] -pub mod gossip_processor; pub mod processor; use crate::error; use crate::service::NetworkMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ rpc::{RPCError, RequestId}, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, }; use futures::prelude::*; use processor::Processor; -use slog::{debug, info, o, trace, warn}; +use slog::{debug, o, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; use types::EthSpec; @@ -229,21 +228,7 @@ impl Router { ); } PubsubMessage::BeaconBlock(block) => { - match self.processor.should_forward_block(block) { - Ok(verified_block) => { - info!(self.log, "New block received"; "slot" => verified_block.block.slot(), "hash" => verified_block.block_root.to_string()); - self.propagate_message(id, peer_id.clone()); - self.processor.on_block_gossip(peer_id, verified_block); - } - Err(BlockError::ParentUnknown(block)) => { - self.processor.on_unknown_parent(peer_id, block); - } - Err(e) => { - // performing a parent lookup - warn!(self.log, "Could not verify block for gossip"; - "error" => format!("{:?}", e)); - } - } + self.processor.on_block_gossip(id, peer_id, block); } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => format!("{}", peer_id)); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 52f97fff81e..3a663badd3a 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -1,17 +1,15 @@ -use super::gossip_processor::{GossipProcessor, WorkEvent as GossipWorkEvent}; +use crate::beacon_processor::{ + BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, +}; use crate::service::NetworkMessage; use crate::sync::{PeerSyncInfo, SyncMessage}; -use beacon_chain::{ - observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes, BlockError, - GossipVerifiedBlock, -}; +use beacon_chain::{observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, }; use itertools::process_results; use slog::{debug, error, o, trace, warn}; -use ssz::Encode; use state_processing::SigVerifiedOp; use std::cmp; use std::sync::Arc; @@ -36,8 +34,8 @@ pub struct Processor { sync_send: mpsc::UnboundedSender>, /// A network context to return and handle RPC requests. network: HandlerNetworkContext, - /// A multi-threaded, non-blocking processor for consensus gossip messages. - gossip_processor_send: mpsc::Sender>, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, /// The `RPCHandler` logger. log: slog::Logger, } @@ -52,6 +50,8 @@ impl Processor { log: &slog::Logger, ) -> Self { let sync_logger = log.new(o!("service"=> "sync")); + let (beacon_processor_send, beacon_processor_receive) = + mpsc::channel(MAX_WORK_EVENT_QUEUE_LEN); // spawn the sync thread let sync_send = crate::sync::manager::spawn( @@ -59,11 +59,12 @@ impl Processor { beacon_chain.clone(), network_globals.clone(), network_send.clone(), + beacon_processor_send.clone(), sync_logger, ); - let gossip_processor_send = GossipProcessor { - beacon_chain: beacon_chain.clone(), + BeaconProcessor { + beacon_chain: Arc::downgrade(&beacon_chain), network_tx: network_send.clone(), sync_tx: sync_send.clone(), network_globals, @@ -72,13 +73,13 @@ impl Processor { current_workers: 0, log: log.clone(), } - .spawn_manager(); + .spawn_manager(beacon_processor_receive); Processor { chain: beacon_chain, sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), - gossip_processor_send, + beacon_processor_send, log: log.clone(), } } @@ -513,23 +514,6 @@ impl Processor { } } - /// Template function to be called on a block to determine if the block should be propagated - /// across the network. - pub fn should_forward_block( - &mut self, - block: Box>, - ) -> Result, BlockError> { - self.chain.verify_block_for_gossip(*block) - } - - pub fn on_unknown_parent( - &mut self, - peer_id: PeerId, - block: Box>, - ) { - self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block)); - } - /// Process a gossip message declaring a new block. /// /// Attempts to apply to block to the beacon chain. May queue the block for later processing. @@ -537,65 +521,22 @@ impl Processor { /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip( &mut self, + message_id: MessageId, peer_id: PeerId, - verified_block: GossipVerifiedBlock, - ) -> bool { - let block = Box::new(verified_block.block.clone()); - match self.chain.process_block(verified_block) { - Ok(_block_root) => { - trace!( - self.log, - "Gossipsub block processed"; - "peer_id" => peer_id.to_string() - ); - - // TODO: It would be better if we can run this _after_ we publish the block to - // reduce block propagation latency. - // - // The `MessageHandler` would be the place to put this, however it doesn't seem - // to have a reference to the `BeaconChain`. I will leave this for future - // works. - match self.chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "location" => "block gossip" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "block gossip" - ), - } - } - Err(BlockError::ParentUnknown { .. }) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` + block: Box>, + ) { + self.beacon_processor_send + .try_send(BeaconWorkEvent::gossip_beacon_block( + message_id, peer_id, block, + )) + .unwrap_or_else(|e| { error!( - self.log, - "Block with unknown parent attempted to be processed"; - "peer_id" => peer_id.to_string() - ); - self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block)); - } - other => { - warn!( - self.log, - "Invalid gossip beacon block"; - "outcome" => format!("{:?}", other), - "block root" => format!("{}", block.canonical_root()), - "block slot" => block.slot() - ); - trace!( - self.log, - "Invalid gossip beacon block ssz"; - "ssz" => format!("0x{}", hex::encode(block.as_ssz_bytes())), - ); - } - } - // TODO: Update with correct block gossip checking - true + &self.log, + "Unable to send to gossip processor"; + "type" => "block gossip", + "error" => e.to_string(), + ) + }) } pub fn on_unaggregated_attestation_gossip( @@ -606,8 +547,8 @@ impl Processor { subnet_id: SubnetId, should_process: bool, ) { - self.gossip_processor_send - .try_send(GossipWorkEvent::unaggregated_attestation( + self.beacon_processor_send + .try_send(BeaconWorkEvent::unaggregated_attestation( message_id, peer_id, unaggregated_attestation, @@ -630,8 +571,8 @@ impl Processor { peer_id: PeerId, aggregate: SignedAggregateAndProof, ) { - self.gossip_processor_send - .try_send(GossipWorkEvent::aggregated_attestation( + self.beacon_processor_send + .try_send(BeaconWorkEvent::aggregated_attestation( message_id, peer_id, aggregate, )) .unwrap_or_else(|e| { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2f388e46111..2c9cc5add64 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -33,11 +33,11 @@ //! if an attestation references an unknown block) this manager can search for the block and //! subsequently search for parents if needed. -use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH}; use super::RequestId; +use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; @@ -109,6 +109,18 @@ pub enum SyncMessage { ParentLookupFailed(PeerId), } +/// The result of processing a multiple blocks (a chain segment). +// TODO: When correct batch error handling occurs, we will include an error type. +#[derive(Debug)] +pub enum BatchProcessResult { + /// The batch was completed successfully. + Success, + /// The batch processing failed. + Failed, + /// The batch processing failed but managed to import at least one block. + Partial, +} + /// Maintains a sequential list of parents to lookup and the lookup's current state. struct ParentRequests { /// The blocks that have currently been downloaded. @@ -158,8 +170,8 @@ pub struct SyncManager { /// The logger for the import manager. log: Logger, - /// The sending part of input_channel - sync_send: mpsc::UnboundedSender>, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, } /// Object representing a single block lookup request. @@ -187,6 +199,7 @@ pub fn spawn( beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, + beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> mpsc::UnboundedSender> { assert!( @@ -201,7 +214,7 @@ pub fn spawn( range_sync: RangeSync::new( beacon_chain.clone(), network_globals.clone(), - sync_send.clone(), + beacon_processor_send.clone(), log.clone(), ), network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()), @@ -211,7 +224,7 @@ pub fn spawn( parent_queue: SmallVec::new(), single_block_lookups: FnvHashMap::default(), log: log.clone(), - sync_send: sync_send.clone(), + beacon_processor_send, }; // spawn the sync manager thread @@ -300,7 +313,7 @@ impl SyncManager { /// There are two reasons we could have received a BlocksByRoot response /// - We requested a single hash and have received a response for the single_block_lookup /// - We are looking up parent blocks in parent lookup search - fn blocks_by_root_response( + async fn blocks_by_root_response( &mut self, peer_id: PeerId, request_id: RequestId, @@ -318,7 +331,8 @@ impl SyncManager { single_block_hash = Some(block_request.hash); } if let Some(block_hash) = single_block_hash { - self.single_block_lookup_response(peer_id, block, block_hash); + self.single_block_lookup_response(peer_id, block, block_hash) + .await; return; } @@ -340,7 +354,7 @@ impl SyncManager { // add the block to response parent_request.downloaded_blocks.push(block); // queue for processing - self.process_parent_request(parent_request); + self.process_parent_request(parent_request).await; } None => { // this is a stream termination @@ -381,10 +395,40 @@ impl SyncManager { } } + async fn process_block_async( + &mut self, + block: SignedBeaconBlock, + ) -> Option>> { + let (event, rx) = BeaconWorkEvent::rpc_beacon_block(Box::new(block)); + match self.beacon_processor_send.try_send(event) { + Ok(_) => {} + Err(e) => { + error!( + self.log, + "Failed to send sync block to processor"; + "error" => format!("{:?}", e) + ); + return None; + } + } + + match rx.await { + Ok(block_result) => Some(block_result), + Err(_) => { + warn!( + self.log, + "Sync block not processed"; + "msg" => "likely due to system resource exhaustion" + ); + None + } + } + } + /// Processes the response obtained from a single block lookup search. If the block is /// processed or errors, the search ends. If the blocks parent is unknown, a block parent /// lookup search is started. - fn single_block_lookup_response( + async fn single_block_lookup_response( &mut self, peer_id: PeerId, block: SignedBeaconBlock, @@ -399,8 +443,13 @@ impl SyncManager { return; } + let block_result = match self.process_block_async(block.clone()).await { + Some(block_result) => block_result, + None => return, + }; + // we have the correct block, try and process it - match self.chain.process_block(block.clone()) { + match block_result { Ok(block_root) => { info!(self.log, "Processed block"; "block" => format!("{}", block_root)); @@ -599,7 +648,7 @@ impl SyncManager { // manager /// A new block has been received for a parent lookup query, process it. - fn process_parent_request(&mut self, mut parent_request: ParentRequests) { + async fn process_parent_request(&mut self, mut parent_request: ParentRequests) { // verify the last added block is the parent of the last requested block if parent_request.downloaded_blocks.len() < 2 { @@ -652,7 +701,13 @@ impl SyncManager { .downloaded_blocks .pop() .expect("There is always at least one block in the queue"); - match self.chain.process_block(newest_block.clone()) { + + let block_result = match self.process_block_async(newest_block.clone()).await { + Some(block_result) => block_result, + None => return, + }; + + match block_result { Err(BlockError::ParentUnknown { .. }) => { // need to keep looking for parents // add the block back to the queue and continue the search @@ -660,13 +715,23 @@ impl SyncManager { self.request_parent(parent_request); } Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { - spawn_block_processor( - Arc::downgrade(&self.chain), - ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()), - parent_request.downloaded_blocks, - self.sync_send.clone(), - self.log.clone(), - ); + let process_id = + ProcessId::ParentLookup(parent_request.last_submitted_peer.clone()); + let blocks = parent_request.downloaded_blocks; + + match self + .beacon_processor_send + .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) + { + Ok(_) => {} + Err(e) => { + error!( + self.log, + "Failed to send chain segment to processor"; + "error" => format!("{:?}", e) + ); + } + } } Err(outcome) => { // all else we consider the chain a failure and downvote the peer that sent @@ -760,7 +825,8 @@ impl SyncManager { request_id, beacon_block, } => { - self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b)); + self.blocks_by_root_response(peer_id, request_id, beacon_block.map(|b| *b)) + .await; } SyncMessage::UnknownBlock(peer_id, block) => { self.add_unknown_block(peer_id, *block); diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 2c0fcabb287..0c0bdce3192 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -1,14 +1,14 @@ //! Syncing for lighthouse. //! //! Stores the various syncing methods for the beacon chain. -mod block_processor; pub mod manager; mod network_context; mod peer_sync_info; mod range_sync; -pub use manager::SyncMessage; +pub use manager::{BatchProcessResult, SyncMessage}; pub use peer_sync_info::PeerSyncInfo; +pub use range_sync::{BatchId, ChainId}; /// Type of id of rpc requests sent by sync pub type RequestId = usize; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b816b965670..b86ef5e158f 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,11 +1,12 @@ use super::batch::{Batch, BatchId, PendingBatches}; -use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; -use crate::sync::network_context::SyncNetworkContext; -use crate::sync::{RequestId, SyncMessage}; +use crate::beacon_processor::ProcessId; +use crate::beacon_processor::WorkEvent as BeaconWorkEvent; +use crate::sync::RequestId; +use crate::sync::{network_context::SyncNetworkContext, BatchProcessResult}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{PeerAction, PeerId}; use rand::prelude::*; -use slog::{crit, debug, warn}; +use slog::{crit, debug, error, warn}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::mpsc; @@ -84,9 +85,8 @@ pub struct SyncingChain { /// The current processing batch, if any. current_processing_batch: Option>, - /// A send channel to the sync manager. This is given to the batch processor thread to report - /// back once batch processing has completed. - sync_send: mpsc::UnboundedSender>, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, /// A reference to the underlying beacon chain. chain: Arc>, @@ -111,7 +111,7 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, - sync_send: mpsc::UnboundedSender>, + beacon_processor_send: mpsc::Sender>, chain: Arc>, log: slog::Logger, ) -> Self { @@ -131,7 +131,7 @@ impl SyncingChain { to_be_processed_id: BatchId(1), state: ChainSyncingState::Stopped, current_processing_batch: None, - sync_send, + beacon_processor_send, chain, log, } @@ -255,18 +255,23 @@ impl SyncingChain { } } - /// Sends a batch to the batch processor. + /// Sends a batch to the beacon processor for async processing in a queue. fn process_batch(&mut self, mut batch: Batch) { - let downloaded_blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); + let blocks = std::mem::replace(&mut batch.downloaded_blocks, Vec::new()); let process_id = ProcessId::RangeBatchId(self.id, batch.id); self.current_processing_batch = Some(batch); - spawn_block_processor( - Arc::downgrade(&self.chain.clone()), - process_id, - downloaded_blocks, - self.sync_send.clone(), - self.log.clone(), - ); + + if let Err(e) = self + .beacon_processor_send + .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) + { + error!( + self.log, + "Failed to send chain segment to processor"; + "msg" => "process_batch", + "error" => format!("{:?}", e) + ); + } } /// The block processor has completed processing a batch. This function handles the result diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 5d8083a4207..2eda6fc81ef 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -4,7 +4,7 @@ //! with this struct to to simplify the logic of the other layers of sync. use super::chain::{ChainSyncingState, SyncingChain}; -use crate::sync::manager::SyncMessage; +use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -302,7 +302,7 @@ impl ChainCollection { target_head: Hash256, target_slot: Slot, peer_id: PeerId, - sync_send: mpsc::UnboundedSender>, + beacon_processor_send: mpsc::Sender>, ) { let chain_id = rand::random(); self.finalized_chains.push(SyncingChain::new( @@ -311,7 +311,7 @@ impl ChainCollection { target_slot, target_head, peer_id, - sync_send, + beacon_processor_send, self.beacon_chain.clone(), self.log.clone(), )); @@ -326,7 +326,7 @@ impl ChainCollection { target_head: Hash256, target_slot: Slot, peer_id: PeerId, - sync_send: mpsc::UnboundedSender>, + beacon_processor_send: mpsc::Sender>, ) { // remove the peer from any other head chains @@ -342,7 +342,7 @@ impl ChainCollection { target_slot, target_head, peer_id, - sync_send, + beacon_processor_send, self.beacon_chain.clone(), self.log.clone(), ); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f6a1d80e475..d0a76cf6f99 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,9 +43,9 @@ use super::chain::{ChainId, ProcessingResult}; use super::chain_collection::{ChainCollection, RangeSyncState}; use super::sync_type::RangeSyncType; use super::BatchId; -use crate::sync::block_processor::BatchProcessResult; -use crate::sync::manager::SyncMessage; +use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; +use crate::sync::BatchProcessResult; use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -69,9 +69,8 @@ pub struct RangeSync { /// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before /// the head chains are formed and downloaded. awaiting_head_peers: HashSet, - /// The sync manager channel, allowing the batch processor thread to callback the sync task - /// once complete. - sync_send: mpsc::UnboundedSender>, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. + beacon_processor_send: mpsc::Sender>, /// The syncing logger. log: slog::Logger, } @@ -80,14 +79,14 @@ impl RangeSync { pub fn new( beacon_chain: Arc>, network_globals: Arc>, - sync_send: mpsc::UnboundedSender>, + beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain, network_globals, log.clone()), awaiting_head_peers: HashSet::new(), - sync_send, + beacon_processor_send, log, } } @@ -181,7 +180,7 @@ impl RangeSync { remote_info.finalized_root, remote_finalized_slot, peer_id, - self.sync_send.clone(), + self.beacon_processor_send.clone(), ); self.chains.update_finalized(network); // update the global sync state @@ -228,7 +227,7 @@ impl RangeSync { remote_info.head_root, remote_info.head_slot, peer_id, - self.sync_send.clone(), + self.beacon_processor_send.clone(), ); } self.chains.update_finalized(network); diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index a0f59c54b54..d785e0b5669 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -57,8 +57,8 @@ use prometheus::{HistogramOpts, HistogramTimer, Opts}; pub use prometheus::{ - Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result, - TextEncoder, + Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, + IntGaugeVec, Result, TextEncoder, }; /// Collect all the metrics for reporting. @@ -66,7 +66,7 @@ pub fn gather() -> Vec { prometheus::gather() } -/// Attempts to crate an `IntCounter`, returning `Err` if the registry does not accept the counter +/// Attempts to create an `IntCounter`, returning `Err` if the registry does not accept the counter /// (potentially due to naming conflict). pub fn try_create_int_counter(name: &str, help: &str) -> Result { let opts = Opts::new(name, help); @@ -75,7 +75,7 @@ pub fn try_create_int_counter(name: &str, help: &str) -> Result { Ok(counter) } -/// Attempts to crate an `IntGauge`, returning `Err` if the registry does not accept the counter +/// Attempts to create an `IntGauge`, returning `Err` if the registry does not accept the counter /// (potentially due to naming conflict). pub fn try_create_int_gauge(name: &str, help: &str) -> Result { let opts = Opts::new(name, help); @@ -84,7 +84,7 @@ pub fn try_create_int_gauge(name: &str, help: &str) -> Result { Ok(gauge) } -/// Attempts to crate a `Gauge`, returning `Err` if the registry does not accept the counter +/// Attempts to create a `Gauge`, returning `Err` if the registry does not accept the counter /// (potentially due to naming conflict). pub fn try_create_float_gauge(name: &str, help: &str) -> Result { let opts = Opts::new(name, help); @@ -93,7 +93,7 @@ pub fn try_create_float_gauge(name: &str, help: &str) -> Result { Ok(gauge) } -/// Attempts to crate a `Histogram`, returning `Err` if the registry does not accept the counter +/// Attempts to create a `Histogram`, returning `Err` if the registry does not accept the counter /// (potentially due to naming conflict). pub fn try_create_histogram(name: &str, help: &str) -> Result { let opts = HistogramOpts::new(name, help); @@ -102,7 +102,7 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result { Ok(histogram) } -/// Attempts to crate a `HistogramVec`, returning `Err` if the registry does not accept the counter +/// Attempts to create a `HistogramVec`, returning `Err` if the registry does not accept the counter /// (potentially due to naming conflict). pub fn try_create_histogram_vec( name: &str, @@ -115,7 +115,7 @@ pub fn try_create_histogram_vec( Ok(histogram_vec) } -/// Attempts to crate a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge +/// Attempts to create a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge /// (potentially due to naming conflict). pub fn try_create_int_gauge_vec( name: &str, @@ -128,7 +128,7 @@ pub fn try_create_int_gauge_vec( Ok(counter_vec) } -/// Attempts to crate a `GaugeVec`, returning `Err` if the registry does not accept the gauge +/// Attempts to create a `GaugeVec`, returning `Err` if the registry does not accept the gauge /// (potentially due to naming conflict). pub fn try_create_float_gauge_vec( name: &str, @@ -141,6 +141,20 @@ pub fn try_create_float_gauge_vec( Ok(counter_vec) } +/// Attempts to create a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge +/// (potentially due to naming conflict). +pub fn try_create_int_counter_vec( + name: &str, + help: &str, + label_names: &[&str], +) -> Result { + let opts = Opts::new(name, help); + let counter_vec = IntCounterVec::new(opts, label_names)?; + prometheus::register(Box::new(counter_vec.clone()))?; + Ok(counter_vec) +} + +/// If `int_gauge_vec.is_ok()`, returns a gauge with the given `name`. pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Option { if let Ok(int_gauge_vec) = int_gauge_vec { Some(int_gauge_vec.get_metric_with_label_values(name).ok()?) @@ -149,6 +163,26 @@ pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Opti } } +/// If `int_counter_vec.is_ok()`, returns a counter with the given `name`. +pub fn get_int_counter( + int_counter_vec: &Result, + name: &[&str], +) -> Option { + if let Ok(int_counter_vec) = int_counter_vec { + Some(int_counter_vec.get_metric_with_label_values(name).ok()?) + } else { + None + } +} + +/// Increments the `int_counter_vec` with the given `name`. +pub fn inc_counter_vec(int_counter_vec: &Result, name: &[&str]) { + if let Some(counter) = get_int_counter(int_counter_vec, name) { + counter.inc() + } +} + +/// If `histogram_vec.is_ok()`, returns a histogram with the given `name`. pub fn get_histogram(histogram_vec: &Result, name: &[&str]) -> Option { if let Ok(histogram_vec) = histogram_vec { Some(histogram_vec.get_metric_with_label_values(name).ok()?) @@ -157,6 +191,11 @@ pub fn get_histogram(histogram_vec: &Result, name: &[&str]) -> Opt } } +/// Starts a timer on `vec` with the given `name`. +pub fn start_timer_vec(vec: &Result, name: &[&str]) -> Option { + get_histogram(vec, name).map(|h| h.start_timer()) +} + /// Starts a timer for the given `Histogram`, stopping when it gets dropped or given to `stop_timer(..)`. pub fn start_timer(histogram: &Result) -> Option { if let Ok(histogram) = histogram { From 73cbfbdfd0f7e836d471da28cd0d3d1eb0344475 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Aug 2020 10:06:06 +0000 Subject: [PATCH 3/6] Ensure RUSTFLAGS is passed through on cross compile (#1529) ## Issue Addressed NA ## Proposed Changes Tells `cross` (used for cross-compiling) to read the `RUSTFLAGS`env and pass it through during build. This allows us to use `-g` and get debug info. ## Additional Info NA --- Cross.toml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Cross.toml diff --git a/Cross.toml b/Cross.toml new file mode 100644 index 00000000000..050f2bdbd75 --- /dev/null +++ b/Cross.toml @@ -0,0 +1,4 @@ +[build.env] +passthrough = [ + "RUSTFLAGS", +] From a58aa6ee55481053aae114d41ef264106de4a210 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Aug 2020 10:06:08 +0000 Subject: [PATCH 4/6] Revert back to discv5 alpha 8 to maintain ARM support (#1531) ## Issue Addressed NA ## Proposed Changes See title. ## Additional Info NA --- Cargo.lock | 236 ++++++++++++++--------------- beacon_node/eth2_libp2p/Cargo.toml | 2 +- lighthouse/environment/Cargo.toml | 2 +- 3 files changed, 119 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c9cc629973..596b5121207 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,7 +77,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fc95d1bdb8e6666b2b217308eeeb09f2d6728d104be3e31916cc74d15420331" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", ] [[package]] @@ -361,7 +361,7 @@ dependencies = [ "slog-term", "sloggers", "slot_clock", - "smallvec 1.4.2", + "smallvec 1.4.1", "state_processing", "store", "tempfile", @@ -478,7 +478,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", ] [[package]] @@ -487,7 +487,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa136449e765dc7faa244561ccae839c394048667929af599b5d931ebe7b7f10" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", ] [[package]] @@ -635,7 +635,7 @@ dependencies = [ "ethereum-types", "quickcheck", "quickcheck_macros", - "smallvec 1.4.2", + "smallvec 1.4.1", "tree_hash", ] @@ -685,9 +685,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.15" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942f72db697d8767c22d46a598e01f2d3b475501ea43d0db4f16d90259182d0b" +checksum = "c74d84029116787153e02106bf53e66828452a4b325cc8652b788b5967c0a0b6" dependencies = [ "num-integer", "num-traits", @@ -696,9 +696,9 @@ dependencies = [ [[package]] name = "clap" -version = "2.33.3" +version = "2.33.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +checksum = "10040cdf04294b565d9e0319955430099ec3813a64c952b86a41200ad714ae48" dependencies = [ "ansi_term", "atty", @@ -1011,7 +1011,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", "subtle 2.2.3", ] @@ -1091,9 +1091,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.3.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d0e2d24e5ee3b23a01de38eefdcd978907890701f08ffffd4cb457ca4ee8d6" +checksum = "72aa14c04dfae8dd7d8a2b1cb7ca2152618cd01336dbfe704b8dcbf8d41dbd69" [[package]] name = "db-key" @@ -1163,7 +1163,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", ] [[package]] @@ -1195,9 +1195,9 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "discv5" -version = "0.1.0-alpha.9" +version = "0.1.0-alpha.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98bc715508160877f74d828b94238c156c50f0ca80f51271bca9a855be94c488" +checksum = "90782d49541b01f9b7e34e6af5d80d01396bf7b1a81505a0035da224134b8d73" dependencies = [ "arrayvec", "digest 0.8.1", @@ -1213,12 +1213,12 @@ dependencies = [ "lru_time_cache", "multihash", "net2", + "openssl", "parking_lot 0.11.0", "rand 0.7.3", "rlp", - "rust-crypto", "sha2 0.8.2", - "smallvec 1.4.2", + "smallvec 1.4.1", "tokio 0.2.22", "uint", "zeroize", @@ -1284,9 +1284,9 @@ dependencies = [ [[package]] name = "either" -version = "1.6.0" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56b59865bce947ac5958779cfa508f6c3b9497cc762b7e24a12d11ccde2c4f" +checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" [[package]] name = "encoding_rs" @@ -1299,9 +1299,9 @@ dependencies = [ [[package]] name = "enr" -version = "0.1.3" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3137b4854534673ea350751670c6fe53920394a328ba9ce4d9acabd4f60a586" +checksum = "1c78d64a14865c080072c05ffb2c11aab15963d5e763ca4dbc02865dc1b615ee" dependencies = [ "base64 0.12.3", "bs58", @@ -1504,7 +1504,7 @@ dependencies = [ "slog-async", "slog-stdlog", "slog-term", - "smallvec 1.4.2", + "smallvec 1.4.1", "snap", "tempdir", "tiny-keccak 2.0.2", @@ -1522,7 +1522,7 @@ version = "0.1.2" dependencies = [ "eth2_ssz_derive", "ethereum-types", - "smallvec 1.4.2", + "smallvec 1.4.1", ] [[package]] @@ -1916,9 +1916,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.4" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +checksum = "60fb4bb6bba52f78a471264d9a3b7d026cc0af47b22cd2cffbc0b787ca003e63" dependencies = [ "typenum", "version_check 0.9.2", @@ -2069,14 +2069,14 @@ checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177" [[package]] name = "handlebars" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5deefd4816fb852b1ff3cb48f6c41da67be2d0e1d20b26a7a3b076da11f064b1" +checksum = "86dbc8a0746b08f363d2e00da48e6c9ceb75c198ac692d2715fcbb5bee74c87d" dependencies = [ "log 0.4.11", "pest", "pest_derive", - "quick-error 2.0.0", + "quick-error", "serde", "serde_json", ] @@ -2093,9 +2093,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.8.2" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25" +checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb" dependencies = [ "autocfg 1.0.0", ] @@ -2241,7 +2241,7 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" dependencies = [ - "quick-error 1.2.3", + "quick-error", ] [[package]] @@ -2399,7 +2399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b45e59b16c76b11bf9738fd5d38879d3bd28ad292d7b313608becb17ae2df9" dependencies = [ "autocfg 1.0.0", - "hashbrown 0.8.2", + "hashbrown 0.8.1", ] [[package]] @@ -2517,9 +2517,9 @@ dependencies = [ [[package]] name = "lazycell" -version = "1.3.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +checksum = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" [[package]] name = "lcli" @@ -2607,7 +2607,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "atomic", "bytes 0.5.6", @@ -2623,11 +2623,12 @@ dependencies = [ "libp2p-swarm", "libp2p-tcp", "libp2p-websocket", + "libp2p-yamux", "multihash", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", "parking_lot 0.10.2", "pin-project", - "smallvec 1.4.2", + "smallvec 1.4.1", "wasm-timer", ] @@ -2658,7 +2659,7 @@ dependencies = [ "ring", "rw-stream-sink", "sha2 0.8.2", - "smallvec 1.4.2", + "smallvec 1.4.1", "thiserror", "unsigned-varint 0.4.0", "void", @@ -2668,7 +2669,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "asn1_der", "bs58", @@ -2681,8 +2682,8 @@ dependencies = [ "libsecp256k1", "log 0.4.11", "multihash", - "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", + "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", "parking_lot 0.10.2", "pin-project", "prost", @@ -2691,7 +2692,7 @@ dependencies = [ "ring", "rw-stream-sink", "sha2 0.8.2", - "smallvec 1.4.2", + "smallvec 1.4.1", "thiserror", "unsigned-varint 0.4.0", "void", @@ -2701,7 +2702,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "quote", "syn", @@ -2710,7 +2711,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2720,7 +2721,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "base64 0.11.0", "byteorder", @@ -2736,7 +2737,7 @@ dependencies = [ "prost-build", "rand 0.7.3", "sha2 0.8.2", - "smallvec 1.4.2", + "smallvec 1.4.1", "unsigned-varint 0.4.0", "wasm-timer", ] @@ -2744,7 +2745,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2752,14 +2753,14 @@ dependencies = [ "log 0.4.11", "prost", "prost-build", - "smallvec 1.4.2", + "smallvec 1.4.1", "wasm-timer", ] [[package]] name = "libp2p-mplex" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "bytes 0.5.6", "fnv", @@ -2774,7 +2775,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -2795,13 +2796,13 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", "log 0.4.11", "rand 0.7.3", - "smallvec 1.4.2", + "smallvec 1.4.1", "void", "wasm-timer", ] @@ -2809,7 +2810,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "futures 0.3.5", "futures-timer", @@ -2824,7 +2825,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "async-tls", "either", @@ -2840,6 +2841,18 @@ dependencies = [ "webpki-roots 0.18.0", ] +[[package]] +name = "libp2p-yamux" +version = "0.21.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +dependencies = [ + "futures 0.3.5", + "libp2p-core 0.21.0", + "parking_lot 0.10.2", + "thiserror", + "yamux", +] + [[package]] name = "libsecp256k1" version = "0.3.5" @@ -2869,9 +2882,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.0.27" +version = "1.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ca8894883d250240341478bf987467332fbdd5da5c42426c69a8f93dbc302f2" +checksum = "2eb5e43362e38e2bca2fd5f5134c4d4564a23a5c28e9b95411652021a8675ebe" dependencies = [ "cc", "libc", @@ -3199,13 +3212,13 @@ checksum = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce" [[package]] name = "multistream-select" version = "0.8.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "bytes 0.5.6", "futures 0.3.5", "log 0.4.11", "pin-project", - "smallvec 1.4.2", + "smallvec 1.4.1", "unsigned-varint 0.4.0", ] @@ -3219,7 +3232,7 @@ dependencies = [ "futures 0.3.5", "log 0.4.11", "pin-project", - "smallvec 1.4.2", + "smallvec 1.4.1", "unsigned-varint 0.4.0", ] @@ -3280,7 +3293,7 @@ dependencies = [ "slog", "sloggers", "slot_clock", - "smallvec 1.4.2", + "smallvec 1.4.1", "state_processing", "store", "tempfile", @@ -3321,6 +3334,12 @@ dependencies = [ "validator_dir", ] +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "2.2.1" @@ -3353,7 +3372,7 @@ dependencies = [ "num-traits", "rand 0.7.3", "serde", - "smallvec 1.4.2", + "smallvec 1.4.1", "zeroize", ] @@ -3492,7 +3511,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" +source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" dependencies = [ "arrayref", "bs58", @@ -3593,7 +3612,7 @@ dependencies = [ "cloudabi 0.0.3", "libc", "redox_syscall", - "smallvec 1.4.2", + "smallvec 1.4.1", "winapi 0.3.9", ] @@ -3608,7 +3627,7 @@ dependencies = [ "instant", "libc", "redox_syscall", - "smallvec 1.4.2", + "smallvec 1.4.1", "winapi 0.3.9", ] @@ -3902,9 +3921,9 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.17.0" +version = "2.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb14183cc7f213ee2410067e1ceeadba2a7478a59432ff0747a335202798b1e2" +checksum = "d883f78645c21b7281d21305181aa1f4dd9e9363e7cf2566c93121552cff003e" [[package]] name = "psutil" @@ -3931,12 +3950,6 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quick-error" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ac73b1112776fc109b2e61909bc46c7e1bf0d7f690ffb1676553acce16d5cda" - [[package]] name = "quickcheck" version = "0.9.2" @@ -4007,16 +4020,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" -[[package]] -name = "rand" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" -dependencies = [ - "libc", - "rand 0.4.6", -] - [[package]] name = "rand" version = "0.4.6" @@ -4362,7 +4365,7 @@ dependencies = [ "libsqlite3-sys", "lru-cache", "memchr", - "smallvec 1.4.2", + "smallvec 1.4.1", "time 0.1.43", ] @@ -4378,19 +4381,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "rust-crypto" -version = "0.2.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" -dependencies = [ - "gcc", - "libc", - "rand 0.3.23", - "rustc-serialize", - "time 0.1.43", -] - [[package]] name = "rustc-demangle" version = "0.1.16" @@ -4409,12 +4399,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" -[[package]] -name = "rustc-serialize" -version = "0.3.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" - [[package]] name = "rustc_version" version = "0.2.3" @@ -4426,9 +4410,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.18.1" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1126dcf58e93cee7d098dbda643b5f92ed724f1f6a63007c1116eed6700c81" +checksum = "cac94b333ee2aac3284c5b8a1b7fb4dd11cba88c244e3fe33cdbd047af0eb693" dependencies = [ "base64 0.12.3", "log 0.4.11", @@ -4595,9 +4579,9 @@ checksum = "a0eddf2e8f50ced781f288c19f18621fa72a3779e3cb58dbf23b07469b0abeb4" [[package]] name = "serde" -version = "1.0.115" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e54c9a88f2da7238af84b5101443f0c0d0a3bbdc455e34a5c9497b1903ed55d5" +checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3" dependencies = [ "serde_derive", ] @@ -4614,9 +4598,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.115" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "609feed1d0a73cc36a0182a840a9b37b4a82f0b1150369f0536a9e3f2a31dc48" +checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ "proc-macro2", "quote", @@ -4920,9 +4904,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.4.2" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" +checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" [[package]] name = "snafu" @@ -5135,7 +5119,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09f8ed9974042b8c3672ff3030a69fcc03b74c47c3d1ecb7755e8a3626011e88" dependencies = [ "block-cipher", - "generic-array 0.14.4", + "generic-array 0.14.3", ] [[package]] @@ -5756,9 +5740,9 @@ checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" [[package]] name = "tracing" -version = "0.1.19" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" +checksum = "f0aae59226cf195d8e74d4b34beae1859257efb4e5fed3f147d2dc2c7d372178" dependencies = [ "cfg-if", "log 0.4.11", @@ -5767,9 +5751,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.14" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db63662723c316b43ca36d833707cc93dff82a02ba3d7e354f342682cc8b3545" +checksum = "d593f98af59ebc017c0648f0117525db358745a8894a8d684e185ba3f45954f9" dependencies = [ "lazy_static", ] @@ -5808,7 +5792,7 @@ dependencies = [ "ethereum-types", "lazy_static", "rand 0.7.3", - "smallvec 1.4.2", + "smallvec 1.4.1", "tree_hash_derive", "types", ] @@ -5892,9 +5876,9 @@ checksum = "c6ff93345ba2206230b1bb1aa3ece1a63dd9443b7531024575d16a0680a59444" [[package]] name = "uint" -version = "0.8.5" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9db035e67dfaf7edd9aebfe8676afcd63eed53c8a4044fed514c8cccf1835177" +checksum = "429ffcad8c8c15f874578c7337d156a3727eb4a1c2374c0ae937ad9a9b748c80" dependencies = [ "arbitrary", "byteorder", @@ -5969,7 +5953,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" dependencies = [ - "generic-array 0.14.4", + "generic-array 0.14.3", "subtle 2.2.3", ] @@ -6486,6 +6470,20 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yamux" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd37e58a1256a0b328ce9c67d8b62ecdd02f4803ba443df478835cb1a41a637c" +dependencies = [ + "futures 0.3.5", + "log 0.4.11", + "nohash-hasher", + "parking_lot 0.10.2", + "rand 0.7.3", + "static_assertions", +] + [[package]] name = "zeroize" version = "1.1.0" diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index d598d5151f4..ba690e494b0 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -32,7 +32,7 @@ snap = "1.0.0" void = "1.0.2" tokio-io-timeout = "0.4.0" tokio-util = { version = "0.3.1", features = ["codec", "compat"] } -discv5 = { version = "0.1.0-alpha.9", features = ["libp2p"] } +discv5 = { version = "0.1.0-alpha.8", features = ["libp2p", "openssl-vendored"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } # TODO: Remove rand crate for mainnet diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index f953f46d84e..16338a0ffc2 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -21,4 +21,4 @@ slog-json = "2.3.0" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } -discv5 = "0.1.0-alpha.9" +discv5 = { version = "0.1.0-alpha.8", features = ["libp2p", "openssl-vendored"] } From 719a69aee042446c6e4c5b707088688e38daae49 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 17 Aug 2020 10:54:58 +0000 Subject: [PATCH 5/6] Ignore blocks that skip a large distance from their parent (#1530) ## Proposed Changes To mitigate the impact of minority forks on RAM and disk usage, this change rejects blocks whose parent lies more than 320 slots (10 epochs, ~1 hour) in the past. The behaviour is configurable via `lighthouse bn --max-skip-slots N`, and can be turned off entirely using `--max-skip-slots none`. Co-authored-by: Paul Hauner --- .../src/attestation_verification.rs | 27 ++++++++++++++++--- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +++ .../beacon_chain/src/block_verification.rs | 12 +++++++++ beacon_node/beacon_chain/src/builder.rs | 19 +++++++++++++ beacon_node/beacon_chain/src/chain_config.rs | 20 ++++++++++++++ beacon_node/beacon_chain/src/lib.rs | 2 ++ beacon_node/beacon_chain/src/test_utils.rs | 2 ++ beacon_node/client/src/builder.rs | 2 ++ beacon_node/client/src/config.rs | 2 ++ .../network/src/beacon_processor/mod.rs | 16 +++++++++++ beacon_node/network/src/metrics.rs | 7 +++++ beacon_node/src/cli.rs | 13 +++++++++ beacon_node/src/config.rs | 10 +++++++ 13 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 beacon_node/beacon_chain/src/chain_config.rs diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 9c8ec92e596..6502e106b38 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -220,6 +220,12 @@ pub enum Error { /// /// The peer has sent an invalid message. Invalid(AttestationValidationError), + /// The attestation head block is too far behind the attestation slot, causing many skip slots. + /// This is deemed a DoS risk. + TooManySkippedSlots { + head_block_slot: Slot, + attestation_slot: Slot, + }, /// There was an error whilst processing the attestation. It is not known if it is valid or invalid. /// /// ## Peer scoring @@ -319,6 +325,7 @@ impl VerifiedAggregatedAttestation { }?; // Ensure the block being voted for (attestation.data.beacon_block_root) passes validation. + // Don't enforce the skip slot restriction for aggregates. // // This indirectly checks to see if the `attestation.data.beacon_block_root` is in our fork // choice. Any known, non-finalized, processed block should be in fork choice, so this @@ -327,7 +334,7 @@ impl VerifiedAggregatedAttestation { // // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. - verify_head_block_is_known(chain, &attestation)?; + verify_head_block_is_known(chain, &attestation, None)?; // Ensure that the attestation has participants. if attestation.aggregation_bits.is_zero() { @@ -433,7 +440,9 @@ impl VerifiedUnaggregatedAttestation { // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. - verify_head_block_is_known(chain, &attestation)?; + // + // Enforce a maximum skip distance for unaggregated attestations. + verify_head_block_is_known(chain, &attestation, chain.config.import_max_skip_slots)?; let (indexed_attestation, committees_per_slot) = obtain_indexed_attestation_and_committees_per_slot(chain, &attestation)?; @@ -531,12 +540,22 @@ impl VerifiedUnaggregatedAttestation { fn verify_head_block_is_known( chain: &BeaconChain, attestation: &Attestation, + max_skip_slots: Option, ) -> Result<(), Error> { - if chain + if let Some(block) = chain .fork_choice .read() - .contains_block(&attestation.data.beacon_block_root) + .get_block(&attestation.data.beacon_block_root) { + // Reject any block that exceeds our limit on skipped slots. + if let Some(max_skip_slots) = max_skip_slots { + if block.slot > attestation.data.slot + max_skip_slots { + return Err(Error::TooManySkippedSlots { + head_block_slot: block.slot, + attestation_slot: attestation.data.slot, + }); + } + } Ok(()) } else { Err(Error::UnknownHeadBlock { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6fa06d3a82e..ef94274fabb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,6 +7,7 @@ use crate::block_verification::{ signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock, IntoFullyVerifiedBlock, }; +use crate::chain_config::ChainConfig; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; @@ -161,6 +162,8 @@ pub trait BeaconChainTypes: Send + Sync + 'static { /// operations and chooses a canonical head. pub struct BeaconChain { pub spec: ChainSpec, + /// Configuration for `BeaconChain` runtime behaviour. + pub config: ChainConfig, /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. pub store: Arc>, /// Database migrator for running background maintenance on the store. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 92460dba125..520ebcea5ac 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -92,6 +92,8 @@ pub enum BlockError { /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. ParentUnknown(Box>), + /// The block skips too many slots and is a DoS risk. + TooManySkippedSlots { parent_slot: Slot, block_slot: Slot }, /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -645,6 +647,16 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { return Err(BlockError::ParentUnknown(Box::new(block))); } + // Reject any block that exceeds our limit on skipped slots. + if let Some(max_skip_slots) = chain.config.import_max_skip_slots { + if block.slot() > parent.beacon_block.slot() + max_skip_slots { + return Err(BlockError::TooManySkippedSlots { + parent_slot: parent.beacon_block.slot(), + block_slot: block.slot(), + }); + } + } + /* * Perform cursory checks to see if the block is even worth processing. */ diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index b0c467920d6..6ff94cfaad9 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -11,6 +11,7 @@ use crate::shuffling_cache::ShufflingCache; use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::ChainConfig; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, EventHandler, @@ -110,6 +111,7 @@ pub struct BeaconChainBuilder { pubkey_cache_path: Option, validator_pubkey_cache: Option, spec: ChainSpec, + chain_config: ChainConfig, disabled_forks: Vec, log: Option, graffiti: Graffiti, @@ -157,6 +159,7 @@ where disabled_forks: Vec::new(), validator_pubkey_cache: None, spec: TEthSpec::default_spec(), + chain_config: ChainConfig::default(), log: None, graffiti: Graffiti::default(), } @@ -171,6 +174,15 @@ where self } + /// Sets the maximum number of blocks that will be skipped when processing + /// some consensus messages. + /// + /// Set to `None` for no limit. + pub fn import_max_skip_slots(mut self, n: Option) -> Self { + self.chain_config.import_max_skip_slots = n; + self + } + /// Sets the store (database). /// /// Should generally be called early in the build chain. @@ -406,6 +418,12 @@ where self } + /// Sets the `ChainConfig` that determines `BeaconChain` runtime behaviour. + pub fn chain_config(mut self, config: ChainConfig) -> Self { + self.chain_config = config; + self + } + /// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied. /// /// An error will be returned at runtime if all required parameters have not been configured. @@ -489,6 +507,7 @@ where let beacon_chain = BeaconChain { spec: self.spec, + config: self.chain_config, store, store_migrator: self .store_migrator diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs new file mode 100644 index 00000000000..ea8c5dd52be --- /dev/null +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -0,0 +1,20 @@ +use serde_derive::{Deserialize, Serialize}; + +pub const DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS: u64 = 10 * 32; + +#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] +pub struct ChainConfig { + /// Maximum number of slots to skip when importing a consensus message (e.g., block, + /// attestation, etc). + /// + /// If `None`, there is no limit. + pub import_max_skip_slots: Option, +} + +impl Default for ChainConfig { + fn default() -> Self { + Self { + import_max_skip_slots: Some(DEFAULT_IMPORT_BLOCK_MAX_SKIP_SLOTS), + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3c597bc72eb..941627c1f36 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -8,6 +8,7 @@ mod beacon_fork_choice_store; mod beacon_snapshot; mod block_verification; pub mod builder; +pub mod chain_config; mod errors; pub mod eth1_chain; pub mod events; @@ -32,6 +33,7 @@ pub use self::beacon_chain::{ ForkChoiceError, StateSkipConfig, }; pub use self::beacon_snapshot::BeaconSnapshot; +pub use self::chain_config::ChainConfig; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 5dccde9ad51..a8e86db8ba1 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -152,6 +152,7 @@ impl BeaconChainHarness> { let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) .custom_spec(spec.clone()) + .import_max_skip_slots(None) .store(store.clone()) .store_migrator(BlockingMigrator::new(store, log.clone())) .data_dir(data_dir.path().to_path_buf()) @@ -190,6 +191,7 @@ impl BeaconChainHarness> { let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) .custom_spec(spec) + .import_max_skip_slots(None) .store(store.clone()) .store_migrator( as Migrate>::new( store, diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 94d67b7c3cb..d9edbe1d2a6 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -135,6 +135,7 @@ where let eth_spec_instance = self.eth_spec_instance.clone(); let data_dir = config.data_dir.clone(); let disabled_forks = config.disabled_forks.clone(); + let chain_config = config.chain.clone(); let graffiti = config.graffiti; let store = @@ -153,6 +154,7 @@ where .store_migrator(store_migrator) .data_dir(data_dir) .custom_spec(spec.clone()) + .chain_config(chain_config) .disabled_forks(disabled_forks) .graffiti(graffiti); diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index aaf0df46c0a..19088e785b5 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -64,6 +64,7 @@ pub struct Config { pub store: store::StoreConfig, pub network: network::NetworkConfig, pub rest_api: rest_api::Config, + pub chain: beacon_chain::ChainConfig, pub websocket_server: websocket_server::Config, pub eth1: eth1::Config, } @@ -78,6 +79,7 @@ impl Default for Config { genesis: <_>::default(), store: <_>::default(), network: NetworkConfig::default(), + chain: <_>::default(), rest_api: <_>::default(), websocket_server: <_>::default(), spec_constants: TESTNET_SPEC_CONSTANTS.into(), diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 33022c0ad7d..6cce8a28ae0 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1150,6 +1150,22 @@ pub fn handle_attestation_verification_failure( * The peer has published an invalid consensus message. */ } + AttnError::TooManySkippedSlots { + head_block_slot, + attestation_slot, + } => { + /* + * The attestation references a head block that is too far behind the attestation slot. + * + * The message is not necessarily invalid, but we choose to ignore it. + */ + debug!( + log, + "Rejected long skip slot attestation"; + "head_block_slot" => head_block_slot, + "attestation_slot" => attestation_slot, + ) + } AttnError::BeaconChainError(e) => { /* * Lighthouse hit an unexpected error whilst processing the attestation. It diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index fa17d5f91fd..18064d79878 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -234,6 +234,10 @@ lazy_static! { "gossip_attestation_error_invalid_state_processing", "Count of a specific error type (see metric name)" ); + pub static ref GOSSIP_ATTESTATION_ERROR_INVALID_TOO_MANY_SKIPPED_SLOTS: Result = try_create_int_counter( + "gossip_attestation_error_invalid_too_many_skipped_slots", + "Count of a specific error type (see metric name)" + ); pub static ref GOSSIP_ATTESTATION_ERROR_BEACON_CHAIN_ERROR: Result = try_create_int_counter( "gossip_attestation_error_beacon_chain_error", "Count of a specific error type (see metric name)" @@ -291,6 +295,9 @@ pub fn register_attestation_error(error: &AttnError) { inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_SUBNET_ID) } AttnError::Invalid(_) => inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_STATE_PROCESSING), + AttnError::TooManySkippedSlots { .. } => { + inc_counter(&GOSSIP_ATTESTATION_ERROR_INVALID_TOO_MANY_SKIPPED_SLOTS) + } AttnError::BeaconChainError(_) => inc_counter(&GOSSIP_ATTESTATION_ERROR_BEACON_CHAIN_ERROR), } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 152f84c44f9..5bb79c7df8a 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -249,4 +249,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("GRAFFITI") .takes_value(true) ) + .arg( + Arg::with_name("max-skip-slots") + .long("max-skip-slots") + .help( + "Refuse to skip more than this many slots when processing a block or attestation. \ + This prevents nodes on minority forks from wasting our time and RAM, \ + but might need to be raised or set to 'none' in times of extreme network \ + outage." + ) + .value_name("NUM_SLOTS") + .takes_value(true) + .default_value("320") + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index bef56e36e95..b2da523ec09 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -386,6 +386,16 @@ pub fn get_config( client_config.graffiti[..trimmed_graffiti_len] .copy_from_slice(&raw_graffiti[..trimmed_graffiti_len]); + if let Some(max_skip_slots) = cli_args.value_of("max-skip-slots") { + client_config.chain.import_max_skip_slots = match max_skip_slots { + "none" => None, + n => Some( + n.parse() + .map_err(|_| "Invalid max-skip-slots".to_string())?, + ), + }; + } + Ok(client_config) } From 9a97a0b14fdcd265769981a02e9cb37dac3b553a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Aug 2020 12:13:42 +0000 Subject: [PATCH 6/6] Prepare for v0.2.4 (#1533) ## Issue Addressed NA ## Proposed Changes NA ## Additional Info NA --- Cargo.lock | 65 +++++++--------------------- account_manager/Cargo.toml | 2 +- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 2 +- lighthouse/Cargo.toml | 2 +- validator_client/Cargo.toml | 2 +- 7 files changed, 22 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 596b5121207..02a470aaa29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2607,7 +2607,7 @@ checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" [[package]] name = "libp2p" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "atomic", "bytes 0.5.6", @@ -2623,9 +2623,8 @@ dependencies = [ "libp2p-swarm", "libp2p-tcp", "libp2p-websocket", - "libp2p-yamux", "multihash", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", "parking_lot 0.10.2", "pin-project", "smallvec 1.4.1", @@ -2669,7 +2668,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "asn1_der", "bs58", @@ -2682,8 +2681,8 @@ dependencies = [ "libsecp256k1", "log 0.4.11", "multihash", - "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", - "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695)", + "multistream-select 0.8.2 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", + "parity-multiaddr 0.9.1 (git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803)", "parking_lot 0.10.2", "pin-project", "prost", @@ -2702,7 +2701,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "quote", "syn", @@ -2711,7 +2710,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2721,7 +2720,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "base64 0.11.0", "byteorder", @@ -2745,7 +2744,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2760,7 +2759,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "bytes 0.5.6", "fnv", @@ -2775,7 +2774,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.23.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "bytes 0.5.6", "curve25519-dalek", @@ -2796,7 +2795,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "futures 0.3.5", "libp2p-core 0.21.0", @@ -2810,7 +2809,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "futures 0.3.5", "futures-timer", @@ -2825,7 +2824,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.22.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "async-tls", "either", @@ -2841,18 +2840,6 @@ dependencies = [ "webpki-roots 0.18.0", ] -[[package]] -name = "libp2p-yamux" -version = "0.21.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" -dependencies = [ - "futures 0.3.5", - "libp2p-core 0.21.0", - "parking_lot 0.10.2", - "thiserror", - "yamux", -] - [[package]] name = "libsecp256k1" version = "0.3.5" @@ -3212,7 +3199,7 @@ checksum = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce" [[package]] name = "multistream-select" version = "0.8.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "bytes 0.5.6", "futures 0.3.5", @@ -3334,12 +3321,6 @@ dependencies = [ "validator_dir", ] -[[package]] -name = "nohash-hasher" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" - [[package]] name = "nom" version = "2.2.1" @@ -3511,7 +3492,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=3096cb6b89b2883a79ce5ffcb03d41778a09b695#3096cb6b89b2883a79ce5ffcb03d41778a09b695" +source = "git+https://github.com/sigp/rust-libp2p?rev=bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803#bbf0cfbaff2f733b3ae7bfed3caba8b7ee542803" dependencies = [ "arrayref", "bs58", @@ -6470,20 +6451,6 @@ dependencies = [ "linked-hash-map", ] -[[package]] -name = "yamux" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd37e58a1256a0b328ce9c67d8b62ecdd02f4803ba443df478835cb1a41a637c" -dependencies = [ - "futures 0.3.5", - "log 0.4.11", - "nohash-hasher", - "parking_lot 0.10.2", - "rand 0.7.3", - "static_assertions", -] - [[package]] name = "zeroize" version = "1.1.0" diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 8ef9ba624fc..a1913c68ad0 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "account_manager" -version = "0.2.3" +version = "0.2.4" authors = ["Paul Hauner ", "Luke Anderson "] edition = "2018" diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index c52a5d5310a..74186089a0f 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "0.2.3" +version = "0.2.4" authors = ["Paul Hauner ", "Age Manning "] edition = "2018" diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 7e6fed5e3ed..1f2a589cfb8 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -10,7 +10,7 @@ use target_info::Target; /// `Lighthouse/v0.2.0-1419501f2+` pub const VERSION: &str = git_version!( args = ["--always", "--dirty=+"], - prefix = "Lighthouse/v0.2.3-", + prefix = "Lighthouse/v0.2.4-", fallback = "unknown" ); diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 44449ff8d50..189050b877f 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "0.2.3" +version = "0.2.4" authors = ["Sigma Prime "] edition = "2018" diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 25f6ed53222..3187a0837cf 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "validator_client" -version = "0.2.3" +version = "0.2.4" authors = ["Paul Hauner ", "Age Manning ", "Luke Anderson "] edition = "2018"