diff --git a/substrate/client/network/sync/src/blocks.rs b/substrate/client/network/sync/src/blocks.rs index 240c1ca1f8b2..cad50fef3e32 100644 --- a/substrate/client/network/sync/src/blocks.rs +++ b/substrate/client/network/sync/src/blocks.rs @@ -212,6 +212,31 @@ impl BlockCollection { ready } + /// Returns the block header of the first block that is ready for importing. + /// `from` is the maximum block number for the start of the range that we are interested in. + /// The function will return None if the first block ready is higher than `from`. + /// The logic is structured to be consistent with ready_blocks(). + pub fn first_ready_block_header(&self, from: NumberFor) -> Option { + let mut prev = from; + for (&start, range_data) in &self.blocks { + if start > prev { + break + } + + match range_data { + BlockRangeState::Complete(blocks) => { + let len = (blocks.len() as u32).into(); + prev = start + len; + if let Some(BlockData { block, .. }) = blocks.first() { + return block.header.clone() + } + }, + _ => continue, + } + } + None + } + pub fn clear_queued(&mut self, hash: &B::Hash) { if let Some((from, to)) = self.queued_blocks.remove(hash) { let mut block_num = from; diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 10eaa2450518..a291da4a90d5 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -1405,8 +1405,27 @@ where /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { + let start_block = self.best_queued_number + One::one(); + + // Verify that the parent of the first available block is in the chain. + // If not, we are downloading from a fork. In this case, wait until + // the start block has a parent on chain. + let parent_on_chain = + self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| { + std::matches!( + self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown), + BlockStatus::InChainWithState | + BlockStatus::InChainPruned | + BlockStatus::Queued + ) + }); + + if !parent_on_chain { + return vec![] + } + self.blocks - .ready_blocks(self.best_queued_number + One::one()) + .ready_blocks(start_block) .into_iter() .map(|block_data| { let justifications = block_data @@ -3364,4 +3383,380 @@ mod test { pending_responses.remove(&peers[1]); assert_eq!(pending_responses.len(), 0); } + + #[test] + fn syncs_fork_with_partial_response_extends_tip() { + sp_tracing::try_init_simple(); + + // Set up: the two chains share the first 15 blocks before + // diverging. The other(canonical) chain fork is longer. + let max_blocks_per_request = 64; + let common_ancestor = 15; + let non_canonical_chain_length = common_ancestor + 3; + let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; + + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut client = Arc::new(TestClientBuilder::new().build()); + + // Blocks on the non-canonical chain. + let non_canonical_blocks = (0..non_canonical_chain_length) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + // Blocks on the canonical chain. + let canonical_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let common_blocks = non_canonical_blocks[..common_ancestor as usize] + .into_iter() + .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) + .cloned() + .collect::>(); + + common_blocks + .into_iter() + .chain( + (0..(canonical_chain_length - common_ancestor as u32)) + .map(|_| build_block(&mut client, None, true)), + ) + .collect::>() + }; + + let mut sync = ChainSync::new( + SyncMode::Full, + client.clone(), + ProtocolName::from("test-block-announce-protocol"), + 1, + max_blocks_per_request, + None, + chain_sync_network_handle, + ) + .unwrap(); + + // Connect the node we will sync from + let peer_id = PeerId::random(); + let canonical_tip = canonical_blocks.last().unwrap().clone(); + let mut request = sync + .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) + .unwrap() + .unwrap(); + assert_eq!(FromBlock::Number(client.info().best_number), request.from); + assert_eq!(Some(1), request.max); + + // Do the ancestor search + loop { + let block = + &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { + // We found the ancestor + break + }; + + log::trace!(target: LOG_TARGET, "Request: {request:?}"); + } + + // The response for the 64 blocks is returned in two parts: + // part 1: last 61 blocks [19..79], part 2: first 3 blocks [16-18]. + // Even though the first part extends the current chain ending at 18, + // it should not result in an import yet. + let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; + let resp_2_from = common_ancestor as u64 + 3; + + // No import expected. + let request = get_block_request( + &mut sync, + FromBlock::Number(resp_1_from), + max_blocks_per_request as u32, + &peer_id, + ); + + let from = unwrap_from_block_number(request.from.clone()); + let mut resp_blocks = canonical_blocks[18..from as usize].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() + ),); + + // Gap filled, expect max_blocks_per_request being imported now. + let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 3, &peer_id); + let mut resp_blocks = canonical_blocks[common_ancestor as usize..18].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + let to_import: Vec<_> = match &res { + OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { + assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); + blocks + .iter() + .map(|b| { + let num = *b.header.as_ref().unwrap().number() as usize; + canonical_blocks[num - 1].clone() + }) + .collect() + }, + _ => { + panic!("Unexpected response: {res:?}"); + }, + }; + + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + to_import.into_iter().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + // Sync rest of the chain. + let request = + get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); + let mut resp_blocks = canonical_blocks + [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] + .to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize + ),); + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + resp_blocks.into_iter().rev().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = canonical_chain_length as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + } + + #[test] + fn syncs_fork_with_partial_response_does_not_extend_tip() { + sp_tracing::try_init_simple(); + + // Set up: the two chains share the first 15 blocks before + // diverging. The other(canonical) chain fork is longer. + let max_blocks_per_request = 64; + let common_ancestor = 15; + let non_canonical_chain_length = common_ancestor + 3; + let canonical_chain_length = common_ancestor + max_blocks_per_request + 10; + + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let mut client = Arc::new(TestClientBuilder::new().build()); + + // Blocks on the non-canonical chain. + let non_canonical_blocks = (0..non_canonical_chain_length) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + // Blocks on the canonical chain. + let canonical_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let common_blocks = non_canonical_blocks[..common_ancestor as usize] + .into_iter() + .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) + .cloned() + .collect::>(); + + common_blocks + .into_iter() + .chain( + (0..(canonical_chain_length - common_ancestor as u32)) + .map(|_| build_block(&mut client, None, true)), + ) + .collect::>() + }; + + let mut sync = ChainSync::new( + SyncMode::Full, + client.clone(), + ProtocolName::from("test-block-announce-protocol"), + 1, + max_blocks_per_request, + None, + chain_sync_network_handle, + ) + .unwrap(); + + // Connect the node we will sync from + let peer_id = PeerId::random(); + let canonical_tip = canonical_blocks.last().unwrap().clone(); + let mut request = sync + .new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number()) + .unwrap() + .unwrap(); + assert_eq!(FromBlock::Number(client.info().best_number), request.from); + assert_eq!(Some(1), request.max); + + // Do the ancestor search + loop { + let block = + &canonical_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + request = if let OnBlockData::Request(_peer, request) = on_block_data { + request + } else { + // We found the ancestor + break + }; + + log::trace!(target: LOG_TARGET, "Request: {request:?}"); + } + + // The response for the 64 blocks is returned in two parts: + // part 1: last 62 blocks [18..79], part 2: first 2 blocks [16-17]. + // Even though the first part extends the current chain ending at 18, + // it should not result in an import yet. + let resp_1_from = common_ancestor as u64 + max_blocks_per_request as u64; + let resp_2_from = common_ancestor as u64 + 2; + + // No import expected. + let request = get_block_request( + &mut sync, + FromBlock::Number(resp_1_from), + max_blocks_per_request as u32, + &peer_id, + ); + + let from = unwrap_from_block_number(request.from.clone()); + let mut resp_blocks = canonical_blocks[17..from as usize].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() + ),); + + // Gap filled, expect max_blocks_per_request being imported now. + let request = get_block_request(&mut sync, FromBlock::Number(resp_2_from), 2, &peer_id); + let mut resp_blocks = canonical_blocks[common_ancestor as usize..17].to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + let to_import: Vec<_> = match &res { + OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => { + assert_eq!(blocks.len(), sync.max_blocks_per_request as usize); + blocks + .iter() + .map(|b| { + let num = *b.header.as_ref().unwrap().number() as usize; + canonical_blocks[num - 1].clone() + }) + .collect() + }, + _ => { + panic!("Unexpected response: {res:?}"); + }, + }; + + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + to_import.into_iter().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = common_ancestor as u32 + max_blocks_per_request as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + // Sync rest of the chain. + let request = + get_block_request(&mut sync, FromBlock::Hash(canonical_tip.hash()), 10_u32, &peer_id); + let mut resp_blocks = canonical_blocks + [(canonical_chain_length - 10) as usize..canonical_chain_length as usize] + .to_vec(); + resp_blocks.reverse(); + let response = create_block_response(resp_blocks.clone()); + let res = sync.on_block_data(&peer_id, Some(request), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize + ),); + let _ = sync.on_blocks_processed( + max_blocks_per_request as usize, + resp_blocks.len(), + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + *b.header().number(), + Default::default(), + Some(peer_id), + )), + b.hash(), + ) + }) + .collect(), + ); + resp_blocks.into_iter().rev().for_each(|b| { + assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_)))); + block_on(client.import(BlockOrigin::Own, b)).unwrap(); + }); + let expected_number = canonical_chain_length as u32; + assert_eq!(sync.best_queued_number as u32, expected_number); + assert_eq!(sync.best_queued_hash, canonical_blocks[expected_number as usize - 1].hash()); + } }