From ee7b4be1d91f0dbe4fd9d3331ee8891aacf0de9b Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 26 Sep 2023 15:25:39 +0300 Subject: [PATCH 1/3] Move import queue from `ChainSync` to `SyncingEngine` --- substrate/client/network/common/src/sync.rs | 14 +- substrate/client/network/sync/src/engine.rs | 70 +++++- substrate/client/network/sync/src/lib.rs | 240 +++++++------------- substrate/client/network/sync/src/mock.rs | 6 +- 4 files changed, 160 insertions(+), 170 deletions(-) diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index 8ef0fafa1c79..128baecc3a51 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -116,11 +116,18 @@ impl fmt::Display for BadPeer { impl std::error::Error for BadPeer {} +/// Action that the parent of [`ChainSync`] should perform if we want to import blocks. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ImportBlocksAction { + pub origin: BlockOrigin, + pub blocks: Vec>, +} + /// Result of [`ChainSync::on_block_data`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum OnBlockData { /// The block should be imported. - Import(BlockOrigin, Vec>), + Import(ImportBlocksAction), /// A new block request needs to be made to the given peer. Request(PeerId, BlockRequest), /// Continue processing events. @@ -134,7 +141,7 @@ pub enum OnBlockJustification { Nothing, /// The justification should be imported. Import { - peer: PeerId, + peer_id: PeerId, hash: Block::Hash, number: NumberFor, justifications: Justifications, @@ -379,7 +386,8 @@ pub trait ChainSync: Send { /// Call when a peer has disconnected. /// Canceled obsolete block request may result in some blocks being ready for /// import, so this functions checks for such blocks and returns them. - fn peer_disconnected(&mut self, who: &PeerId); + #[must_use] + fn peer_disconnected(&mut self, who: &PeerId) -> Option>; /// Return some key metrics. fn metrics(&self) -> Metrics; diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 8b8874ad4ebd..6dc1dcf10f8c 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -28,7 +28,8 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, service::{self, chain_sync::ToServiceCommand}, warp::WarpSyncParams, - BlockRequestEvent, ChainSync, ClientError, SyncingService, + BlockRequestAction, ChainSync, ClientError, ImportBlocksAction, ImportJustificationsAction, + OnBlockResponse, SyncingService, }; use codec::{Decode, Encode}; @@ -41,7 +42,8 @@ use futures_timer::Delay; use libp2p::{request_response::OutboundFailure, PeerId}; use log::{debug, trace}; use prometheus_endpoint::{ - register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64, + register, Counter, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, + SourcedGauge, U64, }; use prost::Message; use schnellru::{ByLength, LruMap}; @@ -135,6 +137,8 @@ struct Metrics { queued_blocks: Gauge, fork_targets: Gauge, justifications: GaugeVec, + import_queue_blocks_submitted: Counter, + import_queue_justifications_submitted: Counter, } impl Metrics { @@ -164,6 +168,20 @@ impl Metrics { )?; register(g, r)? }, + import_queue_blocks_submitted: { + let c = Counter::new( + "substrate_sync_import_queue_blocks_submitted", + "Number of blocks submitted to the import queue.", + )?; + register(c, r)? + }, + import_queue_justifications_submitted: { + let c = Counter::new( + "substrate_sync_import_queue_justifications_submitted", + "Number of justifications submitted to the import queue.", + )?; + register(c, r)? + }, }) } } @@ -311,6 +329,9 @@ pub struct SyncingEngine { /// Protocol name used to send out warp sync requests warp_sync_protocol_name: Option, + + /// Handle to import queue. + import_queue: Box>, } impl SyncingEngine @@ -436,9 +457,7 @@ where max_parallel_downloads, max_blocks_per_request, warp_sync_config, - metrics_registry, network_service.clone(), - import_queue, )?; let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -501,6 +520,7 @@ where block_downloader, state_request_protocol_name, warp_sync_protocol_name, + import_queue, }, SyncingService::new(tx, num_connected, is_major_syncing), block_announce_config, @@ -728,13 +748,13 @@ where ToServiceCommand::BlocksProcessed(imported, count, results) => { for result in self.chain_sync.on_blocks_processed(imported, count, results) { match result { - Ok(event) => match event { - BlockRequestEvent::SendRequest { peer_id, request } => { + Ok(action) => match action { + BlockRequestAction::SendRequest { peer_id, request } => { // drop obsolete pending response first self.pending_responses.remove(&peer_id); self.send_block_request(peer_id, request); }, - BlockRequestEvent::RemoveStale { peer_id } => { + BlockRequestAction::RemoveStale { peer_id } => { self.pending_responses.remove(&peer_id); }, }, @@ -922,7 +942,10 @@ where } } - self.chain_sync.peer_disconnected(&peer_id); + if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) { + self.import_blocks(import_blocks_action) + } + self.pending_responses.remove(&peer_id); self.event_streams.retain(|stream| { stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok() @@ -1181,10 +1204,13 @@ where PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { - if let Some((peer_id, new_req)) = - self.chain_sync.on_block_response(peer_id, req, blocks) - { - self.send_block_request(peer_id, new_req); + match self.chain_sync.on_block_response(peer_id, req, blocks) { + OnBlockResponse::SendBlockRequest { peer_id, request } => + self.send_block_request(peer_id, request), + OnBlockResponse::ImportBlocks(action) => self.import_blocks(action), + OnBlockResponse::ImportJustifications(action) => + self.import_justifications(action), + OnBlockResponse::Nothing => {}, } }, Err(BlockResponseError::DecodeFailed(e)) => { @@ -1337,4 +1363,24 @@ where }, } } + + /// Import blocks. + fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction) { + if let Some(metrics) = &self.metrics { + metrics.import_queue_blocks_submitted.inc(); + } + + self.import_queue.import_blocks(origin, blocks); + } + + /// Import justifications. + fn import_justifications(&mut self, action: ImportJustificationsAction) { + if let Some(metrics) = &self.metrics { + metrics.import_queue_justifications_submitted.inc(); + } + + let ImportJustificationsAction { peer_id, hash, number, justifications } = action; + + self.import_queue.import_justifications(peer_id, hash, number, justifications); + } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index ff00e8f90f82..d07060b21af0 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -40,11 +40,8 @@ use extra_requests::ExtraRequests; use libp2p::PeerId; use log::{debug, error, info, trace, warn}; -use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_client_api::{BlockBackend, ProofProvider}; -use sc_consensus::{ - import_queue::ImportQueueService, BlockImportError, BlockImportStatus, IncomingBlock, -}; +use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; use sc_network::types::ProtocolName; use sc_network_common::sync::{ message::{ @@ -52,8 +49,9 @@ use sc_network_common::sync::{ FromBlock, }, warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, OnStateData, - OpaqueStateRequest, OpaqueStateResponse, PeerInfo, SyncMode, SyncState, SyncStatus, + BadPeer, ChainSync as ChainSyncT, ImportBlocksAction, Metrics, OnBlockData, + OnBlockJustification, OnStateData, OpaqueStateRequest, OpaqueStateResponse, PeerInfo, SyncMode, + SyncState, SyncStatus, }; use sp_arithmetic::traits::Saturating; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; @@ -200,45 +198,42 @@ impl Default for AllowedRequests { } } -struct SyncingMetrics { - pub import_queue_blocks_submitted: Counter, - pub import_queue_justifications_submitted: Counter, -} - -impl SyncingMetrics { - fn register(registry: &Registry) -> Result { - Ok(Self { - import_queue_blocks_submitted: register( - Counter::new( - "substrate_sync_import_queue_blocks_submitted", - "Number of blocks submitted to the import queue.", - )?, - registry, - )?, - import_queue_justifications_submitted: register( - Counter::new( - "substrate_sync_import_queue_justifications_submitted", - "Number of justifications submitted to the import queue.", - )?, - registry, - )?, - }) - } -} - struct GapSync { blocks: BlockCollection, best_queued_number: NumberFor, target: NumberFor, } -/// An event used to notify [`engine::SyncingEngine`] if we want to perform a block request -/// or drop an obsolete pending response. -enum BlockRequestEvent { +/// Action that [`engine::SyncingEngine`] should perform after reporting imported blocks with +/// [`ChainSync::on_blocks_processed`]. +enum BlockRequestAction { + /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendRequest { peer_id: PeerId, request: BlockRequest }, + /// Drop stale block request. RemoveStale { peer_id: PeerId }, } +/// Action that [`engine::SyncingEngine`] should perform if we want to import justifications. +struct ImportJustificationsAction { + peer_id: PeerId, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, +} + +/// Action that [`engine::SyncingEngine`] should perform on behalf of [`ChainSync`] +/// after reporting block response with [`ChainSync::on_block_response`]. +enum OnBlockResponse { + /// Nothing to do + Nothing, + /// Perform block request or drop stale block request. + SendBlockRequest { peer_id: PeerId, request: BlockRequest }, + /// Import blocks. + ImportBlocks(ImportBlocksAction), + /// Import justifications. + ImportJustifications(ImportJustificationsAction), +} + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -288,10 +283,6 @@ pub struct ChainSync { network_service: service::network::NetworkServiceHandle, /// Protocol name used for block announcements block_announce_protocol_name: ProtocolName, - /// Handle to import queue. - import_queue: Box>, - /// Metrics. - metrics: Option, } /// All the data we have about a Peer that we are trying to sync with @@ -903,7 +894,7 @@ where return Err(BadPeer(*who, rep::NOT_REQUESTED)) }; - Ok(self.validate_and_queue_blocks(new_blocks, gap)) + Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap))) } fn on_block_justification( @@ -952,10 +943,10 @@ where None }; - if let Some((peer, hash, number, j)) = + if let Some((peer_id, hash, number, justifications)) = self.extra_justifications.on_response(who, justification) { - return Ok(OnBlockJustification::Import { peer, hash, number, justifications: j }) + return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) } } @@ -1093,7 +1084,8 @@ where } } - fn peer_disconnected(&mut self, who: &PeerId) { + #[must_use] + fn peer_disconnected(&mut self, who: &PeerId) -> Option> { self.blocks.clear_peer_download(who); if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(who) @@ -1107,11 +1099,8 @@ where }); let blocks = self.ready_blocks(); - if let Some(OnBlockData::Import(origin, blocks)) = - (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) - { - self.import_blocks(origin, blocks); - } + + (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) } fn metrics(&self) -> Metrics { @@ -1143,9 +1132,7 @@ where max_parallel_downloads: u32, max_blocks_per_request: u32, warp_sync_config: Option>, - metrics_registry: Option<&Registry>, network_service: service::network::NetworkServiceHandle, - import_queue: Box>, ) -> Result { let mut sync = Self { client, @@ -1169,21 +1156,6 @@ where warp_sync_config, warp_sync_target_block_header: None, block_announce_protocol_name, - import_queue, - metrics: if let Some(r) = &metrics_registry { - match SyncingMetrics::register(r) { - Ok(metrics) => Some(metrics), - Err(err) => { - error!( - target: LOG_TARGET, - "Failed to register metrics for ChainSync: {err:?}", - ); - None - }, - } - } else { - None - }, }; sync.reset_sync_start_point()?; @@ -1229,7 +1201,7 @@ where &mut self, mut new_blocks: Vec>, gap: bool, - ) -> OnBlockData { + ) -> ImportBlocksAction { let orig_len = new_blocks.len(); new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); if new_blocks.len() != orig_len { @@ -1260,7 +1232,8 @@ where self.on_block_queued(h, n) } self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); - OnBlockData::Import(origin, new_blocks) + + ImportBlocksAction { origin, blocks: new_blocks } } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1311,7 +1284,7 @@ where /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. - fn restart(&mut self) -> impl Iterator, BadPeer>> + '_ { + fn restart(&mut self) -> impl Iterator, BadPeer>> + '_ { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); @@ -1338,9 +1311,9 @@ where // handle peers that were in other states. match self.new_peer(peer_id, p.best_hash, p.best_number) { // since the request is not a justification, remove it from pending responses - Ok(None) => Some(Ok(BlockRequestEvent::RemoveStale { peer_id })), + Ok(None) => Some(Ok(BlockRequestAction::RemoveStale { peer_id })), // update the request if the new one is available - Ok(Some(request)) => Some(Ok(BlockRequestEvent::SendRequest { peer_id, request })), + Ok(Some(request)) => Some(Ok(BlockRequestAction::SendRequest { peer_id, request })), // this implies that we need to drop pending response from the peer Err(e) => Some(Err(e)), } @@ -1491,12 +1464,13 @@ where None } + /// Process blocks received in a response. pub(crate) fn on_block_response( &mut self, peer_id: PeerId, request: BlockRequest, blocks: Vec>, - ) -> Option<(PeerId, BlockRequest)> { + ) -> OnBlockResponse { let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -1521,44 +1495,51 @@ where if request.fields == BlockAttributes::JUSTIFICATION { match self.on_block_justification(peer_id, block_response) { - Ok(OnBlockJustification::Nothing) => None, - Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => { - self.import_justifications(peer, hash, number, justifications); - None - }, + Ok(OnBlockJustification::Nothing) => OnBlockResponse::Nothing, + Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) => + OnBlockResponse::ImportJustifications(ImportJustificationsAction { + peer_id, + hash, + number, + justifications, + }), Err(BadPeer(id, repu)) => { self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); - None + OnBlockResponse::Nothing }, } } else { match self.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(origin, blocks)) => { - self.import_blocks(origin, blocks); - None - }, - Ok(OnBlockData::Request(peer, req)) => Some((peer, req)), - Ok(OnBlockData::Continue) => None, + Ok(OnBlockData::Import(action)) => OnBlockResponse::ImportBlocks(action), + Ok(OnBlockData::Request(peer_id, request)) => + OnBlockResponse::SendBlockRequest { peer_id, request }, + Ok(OnBlockData::Continue) => OnBlockResponse::Nothing, Err(BadPeer(id, repu)) => { self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); - None + OnBlockResponse::Nothing }, } } } - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { + pub fn on_state_response( + &mut self, + peer_id: PeerId, + response: OpaqueStateResponse, + ) -> Option> { match self.on_state_data(&peer_id, response) { - Ok(OnStateData::Import(origin, block)) => self.import_blocks(origin, vec![block]), - Ok(OnStateData::Continue) => {}, + Ok(OnStateData::Import(origin, block)) => + Some(ImportBlocksAction { origin, blocks: vec![block] }), + Ok(OnStateData::Continue) => None, Err(BadPeer(id, repu)) => { self.network_service .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); + None }, } } @@ -1897,28 +1878,6 @@ where } } - fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - if let Some(metrics) = &self.metrics { - metrics.import_queue_blocks_submitted.inc(); - } - - self.import_queue.import_blocks(origin, blocks); - } - - fn import_justifications( - &mut self, - peer: PeerId, - hash: B::Hash, - number: NumberFor, - justifications: Justifications, - ) { - if let Some(metrics) = &self.metrics { - metrics.import_queue_justifications_submitted.inc(); - } - - self.import_queue.import_justifications(peer, hash, number, justifications); - } - /// A batch of blocks have been processed, with or without errors. /// /// Call this when a batch of blocks have been processed by the import @@ -1929,7 +1888,7 @@ where imported: usize, count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) -> Box, BadPeer>>> { + ) -> Box, BadPeer>>> { trace!(target: LOG_TARGET, "Imported {imported} of {count}"); let mut output = Vec::new(); @@ -2454,7 +2413,6 @@ mod test { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut sync = ChainSync::new( @@ -2464,9 +2422,7 @@ mod test { 1, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -2515,7 +2471,6 @@ mod test { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); @@ -2526,9 +2481,7 @@ mod test { 1, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -2583,9 +2536,9 @@ mod test { // which should make us send out block requests to the first two peers assert!(block_requests.map(|r| r.unwrap()).all(|event| match event { - BlockRequestEvent::SendRequest { peer_id, .. } => + BlockRequestAction::SendRequest { peer_id, .. } => peer_id == peer_id1 || peer_id == peer_id2, - BlockRequestEvent::RemoveStale { .. } => false, + BlockRequestAction::RemoveStale { .. } => false, })); // peer 3 should be unaffected it was downloading finality data @@ -2686,7 +2639,6 @@ mod test { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); @@ -2697,9 +2649,7 @@ mod test { 5, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -2761,7 +2711,9 @@ mod test { // We should not yet import the blocks, because there is still an open request for fetching // block `2` which blocks the import. - assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); + assert!( + matches!(res, OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()) + ); let request3 = get_block_request(&mut sync, FromBlock::Number(2), 1, &peer_id2); @@ -2769,14 +2721,16 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request2), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.iter().all(|b| [2, 3, 4].contains(b.header.as_ref().unwrap().number())) )); let response = create_block_response(vec![block2.clone()]); let res = sync.on_block_data(&peer_id2, Some(request3), response).unwrap(); // Nothing to import - assert!(matches!(res, OnBlockData::Import(_, blocks) if blocks.is_empty())); + assert!( + matches!(res, OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()) + ); } fn unwrap_from_block_number(from: FromBlock) -> u64 { @@ -2806,7 +2760,6 @@ mod test { }; let mut client = Arc::new(TestClientBuilder::new().build()); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let info = client.info(); @@ -2818,9 +2771,7 @@ mod test { 5, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -2853,7 +2804,7 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize ),); best_block_num += max_blocks_to_request as u32; @@ -2909,7 +2860,7 @@ mod test { let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.is_empty() + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() ),); let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from); @@ -2936,7 +2887,6 @@ mod test { fn can_sync_huge_fork() { sp_tracing::try_init_simple(); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -2970,9 +2920,7 @@ mod test { 5, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -3030,7 +2978,7 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == sync.max_blocks_per_request as usize + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == sync.max_blocks_per_request as usize ),); best_block_num += sync.max_blocks_per_request as u32; @@ -3073,7 +3021,6 @@ mod test { fn syncs_fork_without_duplicate_requests() { sp_tracing::try_init_simple(); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -3107,9 +3054,7 @@ mod test { 5, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -3168,7 +3113,7 @@ mod test { let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); assert!(matches!( res, - OnBlockData::Import(_, blocks) if blocks.len() == max_blocks_to_request as usize + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize ),); best_block_num += max_blocks_to_request as u32; @@ -3233,7 +3178,6 @@ mod test { #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client = Arc::new(TestClientBuilder::new().build()); @@ -3246,9 +3190,7 @@ mod test { 1, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -3264,14 +3206,13 @@ mod test { send_block_announce(header, peer_id1, &mut sync); assert!(sync.fork_targets.len() == 1); - sync.peer_disconnected(&peer_id1); + let _ = sync.peer_disconnected(&peer_id1); assert!(sync.fork_targets.len() == 0); } #[test] fn can_import_response_with_missing_blocks() { sp_tracing::try_init_simple(); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut client2 = Arc::new(TestClientBuilder::new().build()); @@ -3286,9 +3227,7 @@ mod test { 1, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -3323,7 +3262,6 @@ mod test { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let mut client = Arc::new(TestClientBuilder::new().build()); - let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let mut sync = ChainSync::new( @@ -3333,9 +3271,7 @@ mod test { 1, 64, None, - None, chain_sync_network_handle, - import_queue, ) .unwrap(); @@ -3394,10 +3330,10 @@ mod test { let request_events = sync.restart().collect::>(); for event in request_events.iter() { match event.as_ref().unwrap() { - BlockRequestEvent::RemoveStale { peer_id } => { + BlockRequestAction::RemoveStale { peer_id } => { pending_responses.remove(&peer_id); }, - BlockRequestEvent::SendRequest { peer_id, .. } => { + BlockRequestAction::SendRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); @@ -3406,8 +3342,8 @@ mod test { } assert!(request_events.iter().any(|event| { match event.as_ref().unwrap() { - BlockRequestEvent::RemoveStale { .. } => false, - BlockRequestEvent::SendRequest { peer_id, .. } => peer_id == &peers[0], + BlockRequestAction::RemoveStale { .. } => false, + BlockRequestAction::SendRequest { peer_id, .. } => peer_id == &peers[0], } })); @@ -3417,7 +3353,7 @@ mod test { sync.peers.get(&peers[1]).unwrap().state, PeerSyncState::DownloadingJustification(b1_hash), ); - sync.peer_disconnected(&peers[1]); + let _ = sync.peer_disconnected(&peers[1]); pending_responses.remove(&peers[1]); assert_eq!(pending_responses.len(), 0); } diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index b51eec8d1499..ed7c647c7977 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -25,8 +25,8 @@ use libp2p::PeerId; use sc_network::RequestFailure; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse}, - BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification, PeerInfo, - SyncStatus, + BadPeer, ChainSync as ChainSyncT, ImportBlocksAction, Metrics, OnBlockData, + OnBlockJustification, PeerInfo, SyncStatus, }; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -78,7 +78,7 @@ mockall::mock! { who: PeerId, announce: &BlockAnnounce, ); - fn peer_disconnected(&mut self, who: &PeerId); + fn peer_disconnected(&mut self, who: &PeerId) -> Option>; fn metrics(&self) -> Metrics; } } From 11057a5b9c613a368cc66055e891fb8e97d675a4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 28 Sep 2023 12:57:13 +0300 Subject: [PATCH 2/3] Fix warp sync --- substrate/client/network/sync/src/engine.rs | 9 +++++++-- substrate/client/network/sync/src/lib.rs | 5 ++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 6dc1dcf10f8c..1a383cdde479 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1207,7 +1207,8 @@ where match self.chain_sync.on_block_response(peer_id, req, blocks) { OnBlockResponse::SendBlockRequest { peer_id, request } => self.send_block_request(peer_id, request), - OnBlockResponse::ImportBlocks(action) => self.import_blocks(action), + OnBlockResponse::ImportBlocks(import_blocks_action) => + self.import_blocks(import_blocks_action), OnBlockResponse::ImportJustifications(action) => self.import_justifications(action), OnBlockResponse::Nothing => {}, @@ -1256,7 +1257,11 @@ where }, }; - self.chain_sync.on_state_response(peer_id, response); + if let Some(import_blocks_action) = + self.chain_sync.on_state_response(peer_id, response) + { + self.import_blocks(import_blocks_action); + } }, PeerRequest::WarpProof => { self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp)); diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index d07060b21af0..fd6c858a5a92 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -226,7 +226,7 @@ struct ImportJustificationsAction { enum OnBlockResponse { /// Nothing to do Nothing, - /// Perform block request or drop stale block request. + /// Perform block request. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Import blocks. ImportBlocks(ImportBlocksAction), @@ -1465,6 +1465,7 @@ where } /// Process blocks received in a response. + #[must_use] pub(crate) fn on_block_response( &mut self, peer_id: PeerId, @@ -1526,6 +1527,8 @@ where } } + /// Process state received in a response. + #[must_use] pub fn on_state_response( &mut self, peer_id: PeerId, From 86b236ab26c892ce2936b51f3b68d7e76bfbb5c3 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 28 Sep 2023 13:41:30 +0300 Subject: [PATCH 3/3] Add `#[must_use]` where needed --- substrate/client/network/common/src/sync.rs | 3 +++ substrate/client/network/sync/src/lib.rs | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/substrate/client/network/common/src/sync.rs b/substrate/client/network/common/src/sync.rs index 128baecc3a51..4ca21221f87f 100644 --- a/substrate/client/network/common/src/sync.rs +++ b/substrate/client/network/common/src/sync.rs @@ -316,6 +316,7 @@ pub trait ChainSync: Send { /// Handle a new connected peer. /// /// Call this method whenever we connect to a new peer. + #[must_use] fn new_peer( &mut self, who: PeerId, @@ -347,6 +348,7 @@ pub trait ChainSync: Send { /// /// If this corresponds to a valid block, this outputs the block that /// must be imported in the import queue. + #[must_use] fn on_block_data( &mut self, who: &PeerId, @@ -357,6 +359,7 @@ pub trait ChainSync: Send { /// Handle a response from the remote to a justification request that we made. /// /// `request` must be the original request that triggered `response`. + #[must_use] fn on_block_justification( &mut self, who: PeerId, diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index fd6c858a5a92..10eaa2450518 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -440,6 +440,7 @@ where self.peers.len() } + #[must_use] fn new_peer( &mut self, who: PeerId, @@ -633,6 +634,7 @@ where .extend(peers); } + #[must_use] fn on_block_data( &mut self, who: &PeerId, @@ -897,6 +899,7 @@ where Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap))) } + #[must_use] fn on_block_justification( &mut self, who: PeerId, @@ -1886,6 +1889,7 @@ where /// Call this when a batch of blocks have been processed by the import /// queue, with or without errors. If an error is returned, the pending response /// from the peer must be dropped. + #[must_use] fn on_blocks_processed( &mut self, imported: usize,