Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mechanism to retrieve missing blocks #4175

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@

public class BlockPropagationManager {
private static final Logger LOG = LoggerFactory.getLogger(BlockPropagationManager.class);

private final SynchronizerConfiguration config;
private final ProtocolSchedule protocolSchedule;
private final ProtocolContext protocolContext;
Expand Down Expand Up @@ -364,6 +363,25 @@ private CompletableFuture<Block> processAnnouncedBlock(
return getBlockFromPeers(Optional.of(peer), blockHash.number(), Optional.of(blockHash.hash()));
}

private void requestParentBlock(final BlockHeader blockHeader) {
if (requestedBlocks.add(blockHeader.getParentHash())) {
retrieveParentBlock(blockHeader);
} else {
LOG.trace("Parent block with hash {} was already requested", blockHeader.getParentHash());
}
}

private CompletableFuture<Block> retrieveParentBlock(final BlockHeader blockHeader) {
final long targetParentBlockNumber = blockHeader.getNumber() - 1L;
final Hash targetParentBlockHash = blockHeader.getParentHash();
LOG.info(
"Retrieving parent {} of block #{} from peers",
targetParentBlockHash,
blockHeader.getNumber());
return getBlockFromPeers(
Optional.empty(), targetParentBlockNumber, Optional.of(targetParentBlockHash));
}

private CompletableFuture<Block> getBlockFromPeers(
final Optional<EthPeer> preferredPeer,
final long blockNumber,
Expand Down Expand Up @@ -421,6 +439,10 @@ CompletableFuture<Block> importOrSavePendingBlock(final Block block, final Bytes
if (pendingBlocksManager.registerPendingBlock(block, nodeId)) {
LOG.info("Saving announced block {} for future import", block.toLogString());
}

// Request parent of the lowest announced block
pendingBlocksManager.lowestAnnouncedBlock().ifPresent(this::requestParentBlock);

return CompletableFuture.completedFuture(block);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.hyperledger.besu.testutil.TestClock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand Down Expand Up @@ -634,6 +635,44 @@ public void shouldNotImportBlocksThatAreAlreadyBeingImported() {
verify(ethScheduler, times(1)).scheduleSyncWorkerTask(any(Supplier.class));
}

@Test
public void shouldRequestLowestAnnouncedPendingBlockParent() {
// test if block propagation manager can recover if one block is missed

blockchainUtil.importFirstBlocks(2);
final List<Block> blocks = blockchainUtil.getBlocks().subList(2, 4);

blockPropagationManager.start();

// Create peer and responder
final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0);
final Responder responder = RespondingEthPeer.blockchainResponder(getFullBlockchain());

// skip first block then create messages from blocklist
blocks.stream()
.skip(1)
.map(this::createNewBlockHashMessage)
.forEach(
message -> { // Broadcast new block hash message
EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, peer, message);
});

peer.respondWhile(responder, peer::hasOutstandingRequests);

// assert all blocks were imported
blocks.forEach(
block -> {
assertThat(blockchain.contains(block.getHash())).isTrue();
});
}

private NewBlockHashesMessage createNewBlockHashMessage(final Block block) {
return NewBlockHashesMessage.create(
Collections.singletonList(
new NewBlockHashesMessage.NewBlockHash(
block.getHash(), block.getHeader().getNumber())));
}

@Test
public void verifyBroadcastBlockInvocation() {
blockchainUtil.importFirstBlocks(2);
Expand Down