Skip to content

Commit

Permalink
Improve pending blocks retrieval mechanism (hyperledger#4227)
Browse files Browse the repository at this point in the history
* Add more log to retrieve parent method
* Request the lowest pending ancestor when saving a block
* Replace recursive implementation with iterative when getting pending ancestors of Block
* Decrease scope of synchronized block to reflect only the event of adding pending block to the list
* Add fork to the chain so test is more representative

Signed-off-by: Gabriel Trintinalia <[email protected]>

Signed-off-by: Gabriel Trintinalia <[email protected]>
Co-authored-by: Gabriel Trintinalia <[email protected]>
  • Loading branch information
2 people authored and eum602 committed Nov 3, 2023
1 parent 4c47b75 commit 0b38563
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Additions and Improvements
- Introduce a cap to reputation score increase [#4230](https://github.com/hyperledger/besu/pull/4230)
- Add experimental CLI option for `--Xp2p-peer-lower-bound` [#4200](https://github.com/hyperledger/besu/pull/4200)
- Improve pending blocks retrieval mechanism [#4227](https://github.com/hyperledger/besu/pull/4227)

### Bug Fixes
- Fixes off-by-one error for mainnet TTD fallback [#4223](https://github.com/hyperledger/besu/pull/4223)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,49 @@ private void clearListeners() {
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);

// If there is no children to process, maybe try non announced blocks
if (!maybeProcessPendingChildrenBlocks(newBlock)) {
traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);
maybeProcessNonAnnouncedBlocks(newBlock);
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
}

/**
* Process pending Children if any
*
* @param block the block to process the children
* @return true if block has any pending child
*/
private boolean maybeProcessPendingChildrenBlocks(final Block block) {
final List<Block> readyForImport;
synchronized (pendingBlocksManager) {
// Remove block from pendingBlocks list
pendingBlocksManager.deregisterPendingBlock(newBlock);
pendingBlocksManager.deregisterPendingBlock(block);

// Import any pending blocks that are children of the newly added block
readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash());
readyForImport = pendingBlocksManager.childrenOf(block.getHash());
}

traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);

if (!readyForImport.isEmpty()) {
traceLambda(
LOG,
"Ready to import pending blocks found [{}] for block {}",
() -> readyForImport.stream().map(Block::toLogString).collect(Collectors.joining(", ")),
newBlock::toLogString);
block::toLogString);

final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
Expand All @@ -193,25 +213,17 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
.whenComplete(
(r, t) -> {
if (r != null) {
LOG.info("Imported {} pending blocks", r.size());
LOG.info(
"Imported {} pending blocks: {}",
r.size(),
r.stream().map(b -> b.getHeader().getNumber()).collect(Collectors.toList()));
}
if (t != null) {
LOG.error("Error importing pending blocks", t);
}
});
} else {

traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);

maybeProcessNonAnnouncedBlocks(newBlock);
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
return !readyForImport.isEmpty();
}

private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
Expand All @@ -223,13 +235,13 @@ private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
.map(ProcessableBlockHeader::getNumber)
.ifPresent(
minAnnouncedBlockNumber -> {
long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
final long distance = minAnnouncedBlockNumber - localHeadBlockNumber;
LOG.trace(
"Found lowest announced block {} with distance {}",
minAnnouncedBlockNumber,
distance);

long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;
final long firstNonAnnouncedBlockNumber = newBlock.getHeader().getNumber() + 1;

if (distance < config.getBlockPropagationRange().upperEndpoint()
&& minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) {
Expand Down Expand Up @@ -364,21 +376,19 @@ private CompletableFuture<Block> processAnnouncedBlock(
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}

private void requestParentBlock(final BlockHeader blockHeader) {
private void requestParentBlock(final Block block) {
final BlockHeader blockHeader = block.getHeader();
if (requestedBlocks.add(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.trace("Parent block with hash {} was already requested", blockHeader.getParentHash());
LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash());
}
}

private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info(
"Retrieving parent {} of block #{} from peers",
targetParentBlockHash,
blockHeader.getNumber());
LOG.info("Retrieving parent {} of block {}", targetParentBlockHash, blockHeader.toLogString());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}
Expand Down Expand Up @@ -434,18 +444,13 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes
// invoked for the parent of this block before we are able to register it.
traceLambda(LOG, "Import or save pending block {}", block::toLogString);

synchronized (pendingBlocksManager) {
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info("Saving announced block {} for future import", block.toLogString());
}

// Request parent of the lowest announced block
pendingBlocksManager.lowestAnnouncedBlock().ifPresent(this::requestParentBlock);

return CompletableFuture.completedFuture(block);
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (savePendingBlock(block, nodeId)) {
// if block is saved as pending, try to resolve it
maybeProcessPendingBlocks(block);
}
return CompletableFuture.completedFuture(block);
}

if (!importingBlocks.add(block.getHash())) {
Expand Down Expand Up @@ -480,6 +485,48 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes
blockHeaderValidator, block, parent, badBlockManager));
}

/**
* Save the given block.
*
* @param block the block to track
* @param nodeId node that sent the block
* @return true if the block was added (was not previously present)
*/
private boolean savePendingBlock(final Block block, final Bytes nodeId) {
synchronized (pendingBlocksManager) {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saved announced block for future import {} - {} saved block(s)",
block.toLogString(),
pendingBlocksManager.size());
return true;
}
return false;
}
}

/**
* Try to request the lowest ancestor for the given pending block or process the descendants if
* the ancestor is already in the chain
*/
private void maybeProcessPendingBlocks(final Block block) {
// Try to get the lowest ancestor pending for this block, so we can import it
final Optional<Block> lowestPending = pendingBlocksManager.pendingAncestorBlockOf(block);
if (lowestPending.isPresent()) {
final Block lowestPendingBlock = lowestPending.get();
// If the parent of the lowest ancestor is not in the chain, request it.
if (!protocolContext
.getBlockchain()
.contains(lowestPendingBlock.getHeader().getParentHash())) {
requestParentBlock(lowestPendingBlock);
} else {
LOG.trace("Parent block is already in the chain");
// if the parent is already imported, process its children
maybeProcessPendingChildrenBlocks(lowestPendingBlock);
}
}
}

private CompletableFuture<Block> validateAndProcessPendingBlock(
final BlockHeaderValidator blockHeaderValidator,
final Block block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public boolean contains(final Hash blockHash) {
return pendingBlocks.containsKey(blockHash);
}

public int size() {
return pendingBlocks.size();
}

public List<Block> childrenOf(final Hash parentBlock) {
final Set<Hash> blocksByParent = pendingBlocksByParentHash.get(parentBlock);
if (blocksByParent == null || blocksByParent.size() == 0) {
Expand All @@ -127,6 +131,22 @@ public Optional<BlockHeader> lowestAnnouncedBlock() {
.min(Comparator.comparing(BlockHeader::getNumber));
}

/**
* Get the lowest pending ancestor block saved for a block
*
* @param block target block
* @return An optional with the lowest ancestor pending block
*/
public Optional<Block> pendingAncestorBlockOf(final Block block) {
Block ancestor = block;
int ancestorLevel = 0;
while (pendingBlocks.containsKey(ancestor.getHeader().getParentHash())
&& ancestorLevel++ < pendingBlocks.size()) {
ancestor = pendingBlocks.get(ancestor.getHeader().getParentHash()).block();
}
return Optional.of(ancestor);
}

@Override
public String toString() {
return "PendingBlocksManager{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,36 @@ public void shouldRequestLowestAnnouncedPendingBlockParent() {
});
}

@Test
public void shouldRequestLowestAnnouncedPendingBlockParent_twoMissingBlocks() {
// test if block propagation manager can recover if one block is missed
blockchainUtil.importFirstBlocks(2);
final List<Block> blocks = blockchainUtil.getBlocks().subList(2, 6);

blockPropagationManager.start();

// Create peer and responder
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// skip two block then create messages from blocklist
blocks.stream()
.skip(2)
.map(this::createNewBlockHashMessage)
.forEach(
message -> { // Broadcast new block hash message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, message);
});

peer.respondWhile(responder, peer::hasOutstandingRequests);

// assert all blocks were imported
blocks.forEach(
block -> {
assertThat(blockchain.contains(block.getHash())).isTrue();
});
}

private NewBlockHashesMessage createNewBlockHashMessage(final Block block) {
return NewBlockHashesMessage.create(
Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;

import org.apache.tuweni.bytes.Bytes;
import org.junit.Before;
Expand Down Expand Up @@ -163,7 +164,8 @@ public void shouldPreventNodeFromFillingCache() {
pendingBlocksManager.registerPendingBlock(childBlockFromNodeTwo, NODE_ID_2);

// check blocks from node 1 in the cache (node 1 should replace the lowest priority block)
List<Block> pendingBlocksForParent = pendingBlocksManager.childrenOf(parentBlock.getHash());
final List<Block> pendingBlocksForParent =
pendingBlocksManager.childrenOf(parentBlock.getHash());
for (int i = 0; i < nbBlocks; i++) {
final Block foundBlock = childBlockFromNodeOne.poll();
if (i != 0) {
Expand Down Expand Up @@ -236,7 +238,7 @@ public void shouldReplaceLowestPriorityBlockWhenCacheIsFull() {

// check blocks in the cache
// and verify remove the block with the lowest priority (BLOCK-2)
for (Block block : childBlockFromNodeOne) {
for (final Block block : childBlockFromNodeOne) {
if (block.getHeader().getNumber() == 2) {
assertThat(pendingBlocksManager.contains(block.getHash())).isFalse();
} else {
Expand All @@ -259,4 +261,45 @@ public void shouldReturnLowestBlockByNumber() {

assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}

@Test
public void shouldReturnLowestAncestorPendingBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();

final Block block = gen.nextBlock(parentBlock);
final Block child = gen.nextBlock(block);

final Block forkBlock = gen.nextBlock(parentBlock);
final Block forkChild = gen.nextBlock(forkBlock);

// register chain with one missing block
pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(child, NODE_ID_1);

// Register fork with one missing parent
pendingBlocksManager.registerPendingBlock(forkBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(forkChild, NODE_ID_1);

// assert it is able to follow the chain
final Optional<Block> blockAncestor = pendingBlocksManager.pendingAncestorBlockOf(child);
assertThat(blockAncestor.get().getHeader().getHash()).isEqualTo(block.getHeader().getHash());

// assert it is able to follow the fork
final Optional<Block> forkAncestor = pendingBlocksManager.pendingAncestorBlockOf(forkChild);
assertThat(forkAncestor.get().getHeader().getHash()).isEqualTo(forkBlock.getHeader().getHash());

// Both forks result in the same parent
assertThat(forkAncestor.get().getHeader().getParentHash())
.isEqualTo(blockAncestor.get().getHeader().getParentHash());
}

@Test
public void shouldReturnLowestAncestorPendingBlock_sameBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block block = gen.block();
pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
final Optional<Block> b = pendingBlocksManager.pendingAncestorBlockOf(block);
assertThat(b).contains(block);
}
}

0 comments on commit 0b38563

Please sign in to comment.