diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 789aa7f624d..4f2e609773b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -39,7 +39,7 @@ use super::network_context::SyncNetworkContext; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; -use crate::service::NetworkMessage; +use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; @@ -53,8 +53,10 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; use types::signed_block_and_blobs::BlockMaybeBlobs; +use types::{ + BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot, +}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -67,6 +69,16 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; pub type Id = u32; +pub struct SeansBlock {} + +pub struct SeansBlob {} + +/// This is the one that has them both and goes to range. +pub struct SeansBlockBlob { + block: SeansBlock, + blob: SeansBlob, +} + /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum RequestId { @@ -78,6 +90,8 @@ pub enum RequestId { BackFillSync { id: Id }, /// The request was from a chain in the range sync algorithm. RangeSync { id: Id }, + /// The request was from a chain in range, asking for ranges of blocks and blobs. + RangeBlockBlob { id: Id }, } #[derive(Debug)] @@ -292,6 +306,19 @@ impl SyncManager { self.update_sync_state() } } + RequestId::RangeBlockBlob { id } => { + if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(request_id) + { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + id, + ); + self.update_sync_state() + } + } } } @@ -702,8 +729,26 @@ impl SyncManager { self.update_sync_state(); } } + RequestId::RangeBlockBlob { id } => { + // do stuff + self.network.block_blob_block_response(id, block); + } } } + + fn rpc_blob_received( + &mut self, + request_id: RequestId, + peer_id: PeerId, + beacon_block: Option, + seen_timestamp: Duration, + ) { + let RequestId::RangeBlockBlob { id } = request_id else { + return error!("bad stuff"); + }; + // get the paired block blob from the network context and send it to range + self.network.block_blob_blob_response(request_id, blob) + } } impl From>> for BlockProcessResult { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1bb378431ce..b8d4b81c9ce 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,21 +1,36 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::manager::{Id, RequestId as SyncRequestId}; +use super::manager::{Id, RequestId as SyncRequestId, SeansBlob, SeansBlock, SeansBlockBlob}; use super::range_sync::{BatchId, ChainId}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChainTypes, EngineState}; use fnv::FnvHashMap; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; +use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::mpsc; -/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. +#[derive(Debug, Default)] +struct BlockBlobRequestInfo { + /// Blocks we have received awaiting for their corresponding blob + accumulated_blocks: VecDeque, + /// Blobs we have received awaiting for their corresponding block + accumulated_blobs: VecDeque, + /// Whether the individual RPC request for blocks is finished or not. + // Not sure if this is needed + is_blocks_rpc_finished: bool, + /// Whether the individual RPC request for blobs is finished or not + // Not sure if this is needed + is_blobs_rpc_finished: bool, +} +/// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>, @@ -32,6 +47,8 @@ pub struct SyncNetworkContext { /// BlocksByRange requests made by backfill syncing. backfill_requests: FnvHashMap, + block_blob_requests: FnvHashMap, + /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, @@ -58,6 +75,7 @@ impl SyncNetworkContext { range_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(), beacon_processor_send, + block_blob_requests: Default::default(), log, } } @@ -127,6 +145,52 @@ impl SyncNetworkContext { Ok(id) } + /// A blocks-blob by range request for the range sync algorithm. + pub fn blocks_blobs_by_range_request( + &mut self, + peer_id: PeerId, + request: BlocksByRangeRequest, // for now this is enough to get both requests. + chain_id: ChainId, + batch_id: BatchId, + ) -> Result { + debug!( + self.log, + "Sending BlockBlock by range request"; + "method" => "BlocksByRangeAndBlobsOrSomething", + "count" => request.count, + "peer" => %peer_id, + ); + + // create the shared request id. This is fine since the rpc handles substream ids. + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::RangeBlockBlob { id }); + + // Create the blob request based on the blob request. + let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { + start_slot: request.start_slot, + count: request.count, + }); + let blocks_request = Request::BlocksByRange(request); + + // Send both requests. Make sure both can be sent. + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blocks_request, + request_id, + }) + .and_then(|| { + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blocks_request, + request_id, + }) + })?; + let block_blob_info = BlockBlobRequestInfo::default(); + self.block_blob_requests + .insert(id, (chain_id, batch_id, block_blob_info)); + Ok(id) + } + /// A blocks by range request sent by the backfill sync algorithm pub fn backfill_blocks_by_range_request( &mut self, @@ -166,6 +230,74 @@ impl SyncNetworkContext { } } + /// Fails a blob bob request. + // We need to recover the chain and batch id to be able to tell range abound the failure. + pub fn fail_block_bob_request(&mut self, request_id: Id) -> Option<(ChainId, BatchId)> { + self.block_blob_requests + .remove(&request_id) + .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)) + } + + /// We received a block for a block blob request. This returns: + /// None: if there is no pairing for this block yet + /// Some(chain_id, Some(paired block blob)) if the block was Some and there was a blob waiting + /// None if the block was none + pub fn block_blob_block_response( + &mut self, + request_id: Id, + block: Option, + ) -> Option<(ChainId, BatchId, Option)> { + let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?; + let response = match block { + Some(block) => match info.accumulated_blobs.pop_front() { + Some(blob) => Some(SeansBlockBlob { block, blob }), + None => { + // accumulate the block + info.accumulated_blocks.push_back(block); + None + } + }, + None => { + info.is_blocks_rpc_finished = true; + + if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { + // this is the coupled stream termination + Some((chain_id, batch_id, None)) + } else { + None + } + } + }; + } + + pub fn block_blob_blob_response( + &mut self, + request_id: Id, + blob: Option, + ) -> Option<(ChainId, Option)> { + let (chain_id, info) = self.block_blob_requests.get_mut(&request_id)?; + let response = match blob { + Some(blob) => match info.accumulated_blocks.pop_front() { + Some(block) => Some(SeansBlockBlob { block, blob }), + None => { + // accumulate the blob + info.accumulated_blobs.push_back(blob); + None + } + }, + None => { + info.is_blobs_rpc_finished = true; + + if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { + // this is the coupled stream termination + Some((chain_id, batch_id, None)) + } else { + None + } + } + }; + } + /// Received a blocks by range response. pub fn backfill_sync_response(&mut self, request_id: Id, remove: bool) -> Option { if remove { @@ -181,7 +313,6 @@ impl SyncNetworkContext { peer_id: PeerId, request: BlocksByRootRequest, ) -> Result { - //FIXME(sean) add prune depth logic here? trace!(