Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pending blocks retrieval mechanism #4227

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
97f21b7
Add more log to retrieve parent method
Gabriel-Trintinalia Aug 8, 2022
149ff69
Fix spotless
Gabriel-Trintinalia Aug 8, 2022
46c7754
Request the lowest pending ancestor when saving a block
Gabriel-Trintinalia Aug 9, 2022
249dd87
Add tests
Gabriel-Trintinalia Aug 9, 2022
d6232c7
Merge branch 'main' into 3955-besu-stop-importing
Gabriel-Trintinalia Aug 9, 2022
ffbf5be
Remove cache exposure
Gabriel-Trintinalia Aug 9, 2022
ef44e09
Remove unused method
Gabriel-Trintinalia Aug 9, 2022
92bcd4b
Merge branch 'main' into 3955-besu-stop-importing
Gabriel-Trintinalia Aug 9, 2022
e6fa24a
Remove double space
Gabriel-Trintinalia Aug 9, 2022
d8bddcb
Add method description
Gabriel-Trintinalia Aug 9, 2022
bb9a127
Merge branch 'main' into 3955-besu-stop-importing
Gabriel-Trintinalia Aug 9, 2022
dd54e83
Merge branch 'main' into 3955-besu-stop-importing
Gabriel-Trintinalia Aug 10, 2022
77b26c3
Rollback log message changes
Gabriel-Trintinalia Aug 10, 2022
c55787d
Merge branch '3955-besu-stop-importing' of https://github.com/Gabriel…
Gabriel-Trintinalia Aug 10, 2022
887a43d
Change log level when parent is already requested
Gabriel-Trintinalia Aug 10, 2022
03ba11d
Replace recursive implementation with iterative when getting pending …
Gabriel-Trintinalia Aug 10, 2022
39ea999
Merge branch 'hyperledger:main' into 3955-besu-stop-importing
Gabriel-Trintinalia Aug 11, 2022
54615a9
Remove null check
Gabriel-Trintinalia Aug 11, 2022
53f0654
Decrease scope of synchronized block to reflect only the event of add…
Gabriel-Trintinalia Aug 11, 2022
1283019
Simplify savePendingBlock method
Gabriel-Trintinalia Aug 11, 2022
0a16734
Remove local variable
Gabriel-Trintinalia Aug 11, 2022
202a079
Refine tests
Gabriel-Trintinalia Aug 11, 2022
e5c1403
Refine tests
Gabriel-Trintinalia Aug 11, 2022
693ebae
Add maxAncestorBlock parameter when searching for a pending ancestor …
Gabriel-Trintinalia Aug 12, 2022
17e45dd
Add final to local variables
Gabriel-Trintinalia Aug 12, 2022
8e91997
Remove maxAncestorLevel parameter
Gabriel-Trintinalia Aug 12, 2022
1bae5c5
Add fork to the chain so test is more representative
Gabriel-Trintinalia Aug 12, 2022
cdc6dd1
Change CHANGELOG.md
Gabriel-Trintinalia Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,49 @@ private void clearListeners() {
private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
// Check to see if any of our pending blocks are now ready for import
final Block newBlock = blockAddedEvent.getBlock();
traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);

// If there is no children to process, maybe try non announced blocks
if (!maybeProcessPendingChildrenBlocks(newBlock)) {
traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);
maybeProcessNonAnnouncedBlocks(newBlock);
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
Gabriel-Trintinalia marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Process pending Children if any
*
* @param block the block to process the children
* @return true if block has any pending child
*/
private boolean maybeProcessPendingChildrenBlocks(final Block block) {
final List<Block> readyForImport;
synchronized (pendingBlocksManager) {
// Remove block from pendingBlocks list
pendingBlocksManager.deregisterPendingBlock(newBlock);
pendingBlocksManager.deregisterPendingBlock(block);

// Import any pending blocks that are children of the newly added block
readyForImport = pendingBlocksManager.childrenOf(newBlock.getHash());
readyForImport = pendingBlocksManager.childrenOf(block.getHash());
}

traceLambda(
LOG,
"Block added event type {} for block {}. Current status {}",
blockAddedEvent::getEventType,
newBlock::toLogString,
() -> this);

if (!readyForImport.isEmpty()) {
traceLambda(
LOG,
"Ready to import pending blocks found [{}] for block {}",
() -> readyForImport.stream().map(Block::toLogString).collect(Collectors.joining(", ")),
newBlock::toLogString);
block::toLogString);

final Supplier<CompletableFuture<List<Block>>> importBlocksTask =
PersistBlockTask.forUnorderedBlocks(
Expand All @@ -193,25 +213,17 @@ private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
.whenComplete(
(r, t) -> {
if (r != null) {
LOG.info("Imported {} pending blocks", r.size());
LOG.info(
"Imported {} pending blocks: {}",
r.size(),
r.stream().map(b -> b.getHeader().getNumber()).collect(Collectors.toList()));
}
if (t != null) {
LOG.error("Error importing pending blocks", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there was an error? Do we need to remove a block from the pending blocks? Depending on what error was thrown?

}
});
} else {

traceLambda(
LOG, "There are no pending blocks ready to import for block {}", newBlock::toLogString);

maybeProcessNonAnnouncedBlocks(newBlock);
}

if (blockAddedEvent.getEventType().equals(EventType.HEAD_ADVANCED)) {
final long head = protocolContext.getBlockchain().getChainHeadBlockNumber();
final long cutoff = head + config.getBlockPropagationRange().lowerEndpoint();
pendingBlocksManager.purgeBlocksOlderThan(cutoff);
}
return !readyForImport.isEmpty();
}

private void maybeProcessNonAnnouncedBlocks(final Block newBlock) {
Expand Down Expand Up @@ -364,21 +376,19 @@ private CompletableFuture<Block> processAnnouncedBlock(
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}

private void requestParentBlock(final BlockHeader blockHeader) {
private void requestParentBlock(final Block block) {
BlockHeader blockHeader = block.getHeader();
Gabriel-Trintinalia marked this conversation as resolved.
Show resolved Hide resolved
if (requestedBlocks.add(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.trace("Parent block with hash {} was already requested", blockHeader.getParentHash());
LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash());
}
}

private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info(
"Retrieving parent {} of block #{} from peers",
targetParentBlockHash,
blockHeader.getNumber());
LOG.info("Retrieving parent {} of block {}", targetParentBlockHash, blockHeader.toLogString());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}
Expand Down Expand Up @@ -434,18 +444,13 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes
// invoked for the parent of this block before we are able to register it.
traceLambda(LOG, "Import or save pending block {}", block::toLogString);

synchronized (pendingBlocksManager) {
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info("Saving announced block {} for future import", block.toLogString());
}

// Request parent of the lowest announced block
pendingBlocksManager.lowestAnnouncedBlock().ifPresent(this::requestParentBlock);

return CompletableFuture.completedFuture(block);
if (!protocolContext.getBlockchain().contains(block.getHeader().getParentHash())) {
// Block isn't connected to local chain, save it to pending blocks collection
if (savePendingBlock(block, nodeId)) {
// if block is saved as pending, try to resolve it
maybeProcessPendingBlocks(block);
}
return CompletableFuture.completedFuture(block);
}

if (!importingBlocks.add(block.getHash())) {
Expand Down Expand Up @@ -480,6 +485,49 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes
blockHeaderValidator, block, parent, badBlockManager));
}

/**
* Save the given block.
*
* @param block the block to track
* @param nodeId node that sent the block
* @return true if the block was added (was not previously present)
*/
private boolean savePendingBlock(final Block block, final Bytes nodeId) {
synchronized (pendingBlocksManager) {
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info(
"Saved announced block for future import {} - {} saved block(s)",
block.toLogString(),
pendingBlocksManager.size());
return true;
}
return false;
}
}

/**
* Try to request the lowest ancestor for the given pending block or process the descendants if
* the ancestor is already in the chain
*/
private void maybeProcessPendingBlocks(final Block block) {
// Try to get the lowest ancestor pending for this block, so we can import it
Optional<Block> lowestPending =
Gabriel-Trintinalia marked this conversation as resolved.
Show resolved Hide resolved
pendingBlocksManager.pendingAncestorBlockOf(block, pendingBlocksManager.size());
if (lowestPending.isPresent()) {
Block lowestPendingBlock = lowestPending.get();
Gabriel-Trintinalia marked this conversation as resolved.
Show resolved Hide resolved
// If the parent of the lowest ancestor is not in the chain, request it.
if (!protocolContext
.getBlockchain()
.contains(lowestPendingBlock.getHeader().getParentHash())) {
requestParentBlock(lowestPendingBlock);
} else {
LOG.trace("Parent block is already in the chain");
// if the parent is already imported, process its children
maybeProcessPendingChildrenBlocks(lowestPendingBlock);
}
}
}

private CompletableFuture<Block> validateAndProcessPendingBlock(
final BlockHeaderValidator blockHeaderValidator,
final Block block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public boolean contains(final Hash blockHash) {
return pendingBlocks.containsKey(blockHash);
}

public int size() {
return pendingBlocks.size();
}

public List<Block> childrenOf(final Hash parentBlock) {
final Set<Hash> blocksByParent = pendingBlocksByParentHash.get(parentBlock);
if (blocksByParent == null || blocksByParent.size() == 0) {
Expand All @@ -127,6 +131,24 @@ public Optional<BlockHeader> lowestAnnouncedBlock() {
.min(Comparator.comparing(BlockHeader::getNumber));
}

/**
* Get the lowest pending ancestor block saved for a block
*
* @param block target block
* @param maxAncestorDifference max level to go back in the pending blocks chain
* @return An optional with the lowest ancestor pending block
*/
public Optional<Block> pendingAncestorBlockOf(
final Block block, final int maxAncestorDifference) {
Block ancestor = block;
int ancestorLevel = 0;
while (pendingBlocks.containsKey(ancestor.getHeader().getParentHash())
&& ancestorLevel++ < maxAncestorDifference) {
ancestor = pendingBlocks.get(ancestor.getHeader().getParentHash()).block();
}
return Optional.of(ancestor);
}

@Override
public String toString() {
return "PendingBlocksManager{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,36 @@ public void shouldRequestLowestAnnouncedPendingBlockParent() {
});
}

@Test
public void shouldRequestLowestAnnouncedPendingBlockParent_twoMissingBlocks() {
// test if block propagation manager can recover if one block is missed
blockchainUtil.importFirstBlocks(2);
final List<Block> blocks = blockchainUtil.getBlocks().subList(2, 6);

blockPropagationManager.start();

// Create peer and responder
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// skip two block then create messages from blocklist
blocks.stream()
.skip(2)
.map(this::createNewBlockHashMessage)
.forEach(
message -> { // Broadcast new block hash message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, message);
});

peer.respondWhile(responder, peer::hasOutstandingRequests);

// assert all blocks were imported
blocks.forEach(
block -> {
assertThat(blockchain.contains(block.getHash())).isTrue();
});
}

private NewBlockHashesMessage createNewBlockHashMessage(final Block block) {
return NewBlockHashesMessage.create(
Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;

import org.apache.tuweni.bytes.Bytes;
import org.junit.Before;
Expand Down Expand Up @@ -259,4 +260,62 @@ public void shouldReturnLowestBlockByNumber() {

assertThat(pendingBlocksManager.lowestAnnouncedBlock()).contains(parentBlock.getHeader());
}

@Test
public void shouldReturnLowestAncestorPendingBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block parentBlock = gen.block();

final Block childBlock = gen.nextBlock(parentBlock);
final Block childBlock2 = gen.nextBlock(childBlock);
final Block childBlock3 = gen.nextBlock(childBlock2);
Gabriel-Trintinalia marked this conversation as resolved.
Show resolved Hide resolved

pendingBlocksManager.registerPendingBlock(parentBlock, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock3, NODE_ID_1);

Optional<Block> block =
pendingBlocksManager.pendingAncestorBlockOf(childBlock3, pendingBlocksManager.size());
assertThat(block.isPresent()).isTrue();
assertThat(block.get().getHeader().getHash()).isEqualTo(childBlock2.getHeader().getHash());
assertThat(block.get().getHeader().getParentHash()).isEqualTo(childBlock.getHeader().getHash());
}

@Test
public void shouldReturnLowestAncestorPendingBlock_maxAncestorLevel() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block block = gen.block();
final Block childBlock1 = gen.nextBlock(block);
final Block childBlock2 = gen.nextBlock(childBlock1);
final Block childBlock3 = gen.nextBlock(childBlock2);

pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock1, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock2, NODE_ID_1);
pendingBlocksManager.registerPendingBlock(childBlock3, NODE_ID_1);

// Must return the lowest pending block in the chain
Optional<Block> ancestor =
pendingBlocksManager.pendingAncestorBlockOf(childBlock3, pendingBlocksManager.size());
assertThat(ancestor.get().getHeader().getHash()).isEqualTo(block.getHeader().getHash());

Optional<Block> ancestor0 = pendingBlocksManager.pendingAncestorBlockOf(childBlock3, 0);
assertThat(ancestor0.get().getHeader().getHash()).isEqualTo(childBlock3.getHeader().getHash());

Optional<Block> ancestor1 = pendingBlocksManager.pendingAncestorBlockOf(childBlock3, 1);
assertThat(ancestor1.get().getHeader().getHash()).isEqualTo(childBlock2.getHeader().getHash());

Optional<Block> ancestor2 = pendingBlocksManager.pendingAncestorBlockOf(childBlock3, 2);
assertThat(ancestor2.get().getHeader().getHash()).isEqualTo(childBlock1.getHeader().getHash());
}

@Test
public void shouldReturnLowestAncestorPendingBlock_sameBlock() {
final BlockDataGenerator gen = new BlockDataGenerator();
final Block block = gen.block();
pendingBlocksManager.registerPendingBlock(block, NODE_ID_1);
Optional<Block> b =
pendingBlocksManager.pendingAncestorBlockOf(block, pendingBlocksManager.size());
assertThat(b).contains(block);
}
}