diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index e28b0a24ad9..b5670d8acf9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -226,6 +226,10 @@ public EthPeer peer(final PeerConnection connection) { return ethPeer != null ? ethPeer : completeConnections.get(connection.getPeer().getId()); } + public EthPeer peer(final Bytes peerId) { + return completeConnections.get(peerId); + } + public PendingPeerRequest executePeerRequest( final PeerRequest request, final long minimumBlockNumber, final Optional peer) { final long actualMinBlockNumber; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java index 6925df9d490..b1b449b4c8a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthProtocolManager.java @@ -316,6 +316,8 @@ public void processMessage(final Capability cap, final Message message) { return; } + // TODO: stefan: find out whether ethMessage is a response or a request + // There are some annoying TRACE messages in the following call if this is not a response // This will handle responses ethPeers.dispatchMessage(ethPeer, ethMessage, getSupportedProtocol()); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java index 7512bdd03a9..d4dd5f0beab 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java @@ -15,20 +15,22 @@ package org.hyperledger.besu.ethereum.eth.manager.snap; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.tuweni.bytes.Bytes32; public class RetryingGetAccountRangeFromPeerTask - extends AbstractRetryingPeerTask { + extends AbstractRetryingSwitchingPeerTask { private final EthContext ethContext; private final Bytes32 startKeyHash; @@ -41,9 +43,9 @@ private RetryingGetAccountRangeFromPeerTask( final Bytes32 startKeyHash, final Bytes32 endKeyHash, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - super( - ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + super(ethContext, metricsSystem, maxRetries); this.ethContext = ethContext; this.startKeyHash = startKeyHash; this.endKeyHash = endKeyHash; @@ -56,23 +58,41 @@ public static EthTask forAccountRange( final Bytes32 startKeyHash, final Bytes32 endKeyHash, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final int maxRetries) { return new RetryingGetAccountRangeFromPeerTask( - ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem); + ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem, maxRetries); } @Override - protected CompletableFuture executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture executeTaskOnCurrentPeer( + final EthPeer assignedPeer) { final GetAccountRangeFromPeerTask task = GetAccountRangeFromPeerTask.forAccountRange( ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); - return executeSubTask(task::run) - .thenApply( - peerResult -> { - result.complete(peerResult.getResult()); - return peerResult.getResult(); - }); + task.assignPeer(assignedPeer); + return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final AccountRangeMessage.AccountRangeData data) { + return data.accounts().isEmpty() && data.proofs().isEmpty(); + } + + @Override + protected boolean reportUselessIfEmptyResponse() { + return false; + } + + @Override + protected boolean successfulResult(final AccountRangeMessage.AccountRangeData peerResult) { + return !emptyResult(peerResult); + } + + @Override + protected Predicate getPeerFilter() { + return (peer) -> + peer.getConnection().getAgreedCapabilities().stream() + .anyMatch((c) -> c.getName().equals(SnapProtocol.NAME)); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java index 7d23ec944d3..38068528dd4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java @@ -15,21 +15,24 @@ package org.hyperledger.besu.ethereum.eth.manager.snap; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -public class RetryingGetBytecodeFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetBytecodeFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { private final EthContext ethContext; private final List codeHashes; @@ -40,8 +43,9 @@ private RetryingGetBytecodeFromPeerTask( final EthContext ethContext, final List codeHashes, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - super(ethContext, 4, Map::isEmpty, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + super(ethContext, metricsSystem, maxRetries); this.ethContext = ethContext; this.codeHashes = codeHashes; this.blockHeader = blockHeader; @@ -52,21 +56,39 @@ public static EthTask> forByteCode( final EthContext ethContext, final List codeHashes, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - return new RetryingGetBytecodeFromPeerTask(ethContext, codeHashes, blockHeader, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + return new RetryingGetBytecodeFromPeerTask( + ethContext, codeHashes, blockHeader, metricsSystem, maxRetries); } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetBytecodeFromPeerTask task = GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); - return executeSubTask(task::run) - .thenApply( - peerResult -> { - result.complete(peerResult.getResult()); - return peerResult.getResult(); - }); + task.assignPeer(peer); + return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final Map peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean reportUselessIfEmptyResponse() { + return false; + } + + @Override + protected boolean successfulResult(final Map peerResult) { + return !emptyResult(peerResult); + } + + @Override + protected Predicate getPeerFilter() { + return (peer) -> + peer.getConnection().getAgreedCapabilities().stream() + .anyMatch((c) -> c.getName().equals(SnapProtocol.NAME)); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java index 7a095de9292..4e1510f1f13 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java @@ -15,21 +15,23 @@ package org.hyperledger.besu.ethereum.eth.manager.snap; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.tuweni.bytes.Bytes32; public class RetryingGetStorageRangeFromPeerTask - extends AbstractRetryingPeerTask { + extends AbstractRetryingSwitchingPeerTask { private final EthContext ethContext; private final List accountHashes; @@ -44,8 +46,9 @@ private RetryingGetStorageRangeFromPeerTask( final Bytes32 startKeyHash, final Bytes32 endKeyHash, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + super(ethContext, metricsSystem, maxRetries); this.ethContext = ethContext; this.accountHashes = accountHashes; this.startKeyHash = startKeyHash; @@ -60,23 +63,47 @@ public static EthTask forStorageRange( final Bytes32 startKeyHash, final Bytes32 endKeyHash, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { + final MetricsSystem metricsSystem, + final int maxRetries) { return new RetryingGetStorageRangeFromPeerTask( - ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem); + ethContext, + accountHashes, + startKeyHash, + endKeyHash, + blockHeader, + metricsSystem, + maxRetries); } @Override - protected CompletableFuture executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture executeTaskOnCurrentPeer( + final EthPeer peer) { final GetStorageRangeFromPeerTask task = GetStorageRangeFromPeerTask.forStorageRange( ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); - return executeSubTask(task::run) - .thenApply( - peerResult -> { - result.complete(peerResult.getResult()); - return peerResult.getResult(); - }); + task.assignPeer(peer); + return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final StorageRangeMessage.SlotRangeData peerResult) { + return peerResult.proofs().isEmpty() && peerResult.slots().isEmpty(); + } + + @Override + protected boolean reportUselessIfEmptyResponse() { + return false; + } + + @Override + protected boolean successfulResult(final StorageRangeMessage.SlotRangeData peerResult) { + return !emptyResult(peerResult); + } + + @Override + protected Predicate getPeerFilter() { + return (peer) -> + peer.getConnection().getAgreedCapabilities().stream() + .anyMatch((c) -> c.getName().equals(SnapProtocol.NAME)); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java index ebfd8856898..fd10b9f4d44 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java @@ -15,20 +15,23 @@ package org.hyperledger.besu.ethereum.eth.manager.snap; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.tuweni.bytes.Bytes; -public class RetryingGetTrieNodeFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetTrieNodeFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { private final EthContext ethContext; private final Map> paths; @@ -39,8 +42,9 @@ private RetryingGetTrieNodeFromPeerTask( final EthContext ethContext, final Map> paths, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - super(ethContext, 4, Map::isEmpty, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + super(ethContext, metricsSystem, maxRetries); this.ethContext = ethContext; this.paths = paths; this.blockHeader = blockHeader; @@ -51,21 +55,39 @@ public static EthTask> forTrieNodes( final EthContext ethContext, final Map> paths, final BlockHeader blockHeader, - final MetricsSystem metricsSystem) { - return new RetryingGetTrieNodeFromPeerTask(ethContext, paths, blockHeader, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + return new RetryingGetTrieNodeFromPeerTask( + ethContext, paths, blockHeader, metricsSystem, maxRetries); } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetTrieNodeFromPeerTask task = GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); - return executeSubTask(task::run) - .thenApply( - peerResult -> { - result.complete(peerResult.getResult()); - return peerResult.getResult(); - }); + task.assignPeer(peer); + return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final Map peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean reportUselessIfEmptyResponse() { + return false; + } + + @Override + protected boolean successfulResult(final Map peerResult) { + return !emptyResult(peerResult); + } + + @Override + protected Predicate getPeerFilter() { + return (peer) -> + peer.getConnection().getAgreedCapabilities().stream() + .anyMatch((c) -> c.getName().equals(SnapProtocol.NAME)); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java index cf48d69847d..94cc432aea3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java @@ -16,6 +16,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; @@ -27,7 +28,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,14 @@ /** * A task that will retry a fixed number of times before completing the associated CompletableFuture * exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link - * #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset. + * #executePeerTask(Optional)} is considered an empty result by {@link #emptyResult(Object)} the + * peer is demoted, if the result is complete according to {@link #successfulResult(Object)} then + * the final task result is set, otherwise the result is considered partial and the retry counter is + * reset. + * + *

Note: extending classes should never set the final task result, using {@code + * result.complete} by themselves, but should return true from {@link #successfulResult(Object)} + * when done. * * @param The type as a typed list that the peer task can get partial or full results in. */ @@ -44,7 +51,6 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { private static final Logger LOG = LoggerFactory.getLogger(AbstractRetryingPeerTask.class); private final EthContext ethContext; private final int maxRetries; - private final Predicate isEmptyResponse; private final MetricsSystem metricsSystem; private int retryCount = 0; private Optional assignedPeer = Optional.empty(); @@ -52,18 +58,13 @@ public abstract class AbstractRetryingPeerTask extends AbstractEthTask { /** * @param ethContext The context of the current Eth network we are attached to. * @param maxRetries Maximum number of retries to accept before completing exceptionally. - * @param isEmptyResponse Test if the response received was empty. * @param metricsSystem The metrics system used to measure task. */ protected AbstractRetryingPeerTask( - final EthContext ethContext, - final int maxRetries, - final Predicate isEmptyResponse, - final MetricsSystem metricsSystem) { + final EthContext ethContext, final int maxRetries, final MetricsSystem metricsSystem) { super(metricsSystem); this.ethContext = ethContext; this.maxRetries = maxRetries; - this.isEmptyResponse = isEmptyResponse; this.metricsSystem = metricsSystem; } @@ -93,11 +94,23 @@ protected void executeTask() { if (error != null) { handleTaskError(error); } else { - // If we get a partial success, reset the retry counter. - if (!isEmptyResponse.test(peerResult)) { - retryCount = 0; + if (successfulResult(peerResult)) { + result.complete(peerResult); + } else { + final boolean emptyResult = emptyResult(peerResult); + if (emptyResult && reportUselessIfEmptyResponse()) { + // record this empty response, so that the peer will be disconnected if there + // were too many + assignedPeer.ifPresent( + peer -> peer.recordUselessResponse(getClass().getSimpleName())); + } + if (!emptyResult) { + // If we get a partial success, reset the retry counter + retryCount = 0; + } + // retry + executeTaskTimed(); } - executeTaskTimed(); } }); } @@ -105,14 +118,14 @@ protected void executeTask() { protected abstract CompletableFuture executePeerTask(Optional assignedPeer); protected void handleTaskError(final Throwable error) { - final Throwable cause = ExceptionUtils.rootCause(error); - if (!isRetryableError(cause)) { + final Throwable rootCause = ExceptionUtils.rootCause(error); + if (!isRetryableError(rootCause)) { // Complete exceptionally - result.completeExceptionally(cause); + result.completeExceptionally(rootCause); return; } - if (cause instanceof NoAvailablePeersException) { + if (rootCause instanceof NoAvailablePeersException) { LOG.debug( "No useful peer found, wait max 5 seconds for new peer to connect: current peers {}", ethContext.getEthPeers().peerCount()); @@ -130,7 +143,7 @@ protected void handleTaskError(final Throwable error) { LOG.debug( "Retrying after recoverable failure from peer task {}: {}", this.getClass().getSimpleName(), - cause.getMessage()); + rootCause.getMessage()); // Wait before retrying on failure executeSubTask( () -> @@ -139,14 +152,16 @@ protected void handleTaskError(final Throwable error) { .scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1))); } - protected boolean isRetryableError(final Throwable error) { - return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error)); + protected boolean isRetryableError(final Throwable rootCause) { + return rootCause instanceof IncompleteResultsException + || rootCause instanceof TimeoutException + || (!assignedPeer.isPresent() && isPeerFailure(rootCause)); } - protected boolean isPeerFailure(final Throwable error) { - return error instanceof PeerBreachedProtocolException - || error instanceof PeerDisconnectedException - || error instanceof NoAvailablePeersException; + protected boolean isPeerFailure(final Throwable rootCause) { + return rootCause instanceof PeerBreachedProtocolException + || rootCause instanceof PeerDisconnectedException + || rootCause instanceof NoAvailablePeersException; } protected EthContext getEthContext() { @@ -164,4 +179,32 @@ public int getRetryCount() { public int getMaxRetries() { return maxRetries; } + + /** + * Identify if the result is empty. + * + * @param peerResult the result to check + * @return true if the result is empty and the request retried + */ + protected abstract boolean emptyResult(final T peerResult); + + /** + * Identify if and empty response can be reported as useless + * + * @return true if an empty response can be reported useless + */ + protected boolean reportUselessIfEmptyResponse() { + return true; + } + + /** + * Identify a successful and complete result. Partial results that are not considered successful + * should return false, so that the request is retried. This check has precedence over the {@link + * #emptyResult(Object)}, so if an empty result is also successful the task completes successfully + * with an empty result. + * + * @param peerResult the result to check + * @return true if the result is successful and can be set as the task result + */ + protected abstract boolean successfulResult(final T peerResult); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index 06c92c76204..84fb251b63c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -20,12 +20,12 @@ import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.util.ExceptionUtils; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import java.util.stream.Stream; @@ -41,11 +41,8 @@ public abstract class AbstractRetryingSwitchingPeerTask extends AbstractRetry private final Set failedPeers = new HashSet<>(); protected AbstractRetryingSwitchingPeerTask( - final EthContext ethContext, - final MetricsSystem metricsSystem, - final Predicate isEmptyResponse, - final int maxRetries) { - super(ethContext, maxRetries, isEmptyResponse, metricsSystem); + final EthContext ethContext, final MetricsSystem metricsSystem, final int maxRetries) { + super(ethContext, maxRetries, metricsSystem); } @Override @@ -93,24 +90,19 @@ protected CompletableFuture executePeerTask(final Optional assignedP .addArgument(peerToUse) .addArgument(this::getRetryCount) .log(); - result.complete(peerResult); return peerResult; }); } @Override protected void handleTaskError(final Throwable error) { - if (isPeerFailure(error)) { - getAssignedPeer().ifPresent(peer -> failedPeers.add(peer)); + final Throwable rootCause = ExceptionUtils.rootCause(error); + if (isPeerFailure(rootCause)) { + getAssignedPeer().ifPresent(failedPeers::add); } super.handleTaskError(error); } - @Override - protected boolean isRetryableError(final Throwable error) { - return error instanceof TimeoutException || isPeerFailure(error); - } - private Optional selectNextPeer() { final Optional maybeNextPeer = remainingPeersToTry().findFirst(); @@ -128,15 +120,20 @@ private Stream remainingPeersToTry() { return getEthContext() .getEthPeers() .streamBestPeers() + .filter(getPeerFilter()) .filter(peer -> !triedPeers.contains(peer)); } + protected Predicate getPeerFilter() { + return (p) -> true; + } + private void refreshPeers() { final EthPeers peers = getEthContext().getEthPeers(); // If we are at max connections, then refresh peers disconnecting one of the failed peers, // or the least useful - if (peers.peerCount() >= peers.getMaxPeers()) { + if (peers.peerCount() >= peers.getPeerLowerBound()) { failedPeers.stream() .filter(peer -> !peer.isDisconnected()) .findAny() diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java index f7266f9c88e..e3ac7309eaf 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java @@ -18,12 +18,10 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -46,7 +44,7 @@ protected RetryingGetBlockFromPeersTask( final int maxRetries, final Optional maybeBlockHash, final long blockNumber) { - super(ethContext, metricsSystem, Objects::isNull, maxRetries); + super(ethContext, metricsSystem, maxRetries); this.protocolSchedule = protocolSchedule; this.maybeBlockHash = maybeBlockHash; this.blockNumber = blockNumber; @@ -80,16 +78,10 @@ protected CompletableFuture> executeTaskOnCurrentPeer( .addArgument(peerResult.getPeer()) .addArgument(this::getRetryCount) .log(); - result.complete(peerResult); return peerResult; }); } - @Override - protected boolean isRetryableError(final Throwable error) { - return super.isRetryableError(error) || error instanceof IncompleteResultsException; - } - @Override protected void handleTaskError(final Throwable error) { if (getRetryCount() < getMaxRetries()) { @@ -109,6 +101,16 @@ protected void handleTaskError(final Throwable error) { super.handleTaskError(error); } + @Override + protected boolean emptyResult(final PeerTaskResult peerResult) { + return peerResult.getResult() == null; + } + + @Override + protected boolean successfulResult(final PeerTaskResult peerResult) { + return !emptyResult(peerResult); + } + private String logBlockNumberMaybeHash() { return blockNumber + maybeBlockHash.map(h -> " (" + h.toHexString() + ")").orElse(""); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java index 0e663ffe24d..8902b8e6798 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java @@ -18,20 +18,16 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.List; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RetryingGetBlocksFromPeersTask - extends AbstractRetryingSwitchingPeerTask>> { +public class RetryingGetBlocksFromPeersTask extends AbstractRetryingSwitchingPeerTask> { private static final Logger LOG = LoggerFactory.getLogger(RetryingGetBlocksFromPeersTask.class); @@ -44,7 +40,7 @@ protected RetryingGetBlocksFromPeersTask( final MetricsSystem metricsSystem, final int maxRetries, final List headers) { - super(ethContext, metricsSystem, Objects::isNull, maxRetries); + super(ethContext, metricsSystem, maxRetries); this.protocolSchedule = protocolSchedule; this.headers = headers; } @@ -60,8 +56,7 @@ public static RetryingGetBlocksFromPeersTask forHeaders( } @Override - protected CompletableFuture>> executeTaskOnCurrentPeer( - final EthPeer currentPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer currentPeer) { final GetBodiesFromPeerTask getBodiesTask = GetBodiesFromPeerTask.forHeaders( protocolSchedule, getEthContext(), headers, getMetricsSystem()); @@ -76,23 +71,10 @@ protected CompletableFuture>> executeTaskOnCurrentPee .addArgument(peerResult.getPeer()) .addArgument(this::getRetryCount) .log(); - - if (peerResult.getResult().isEmpty()) { - currentPeer.recordUselessResponse("GetBodiesFromPeerTask"); - throw new IncompleteResultsException( - "No blocks returned by peer " + currentPeer.getShortNodeId()); - } - - result.complete(peerResult); - return peerResult; + return peerResult.getResult(); }); } - @Override - protected boolean isRetryableError(final Throwable error) { - return super.isRetryableError(error) || error instanceof IncompleteResultsException; - } - @Override protected void handleTaskError(final Throwable error) { if (getRetryCount() < getMaxRetries()) { @@ -107,4 +89,14 @@ protected void handleTaskError(final Throwable error) { } super.handleTaskError(error); } + + @Override + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final List peerResult) { + return !emptyResult(peerResult); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java index 5b5b0cef359..f2a97354d8e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java @@ -21,7 +21,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -51,7 +50,7 @@ public class RetryingGetHeadersEndingAtFromPeerByHashTask final int count, final MetricsSystem metricsSystem, final int maxRetries) { - super(ethContext, metricsSystem, List::isEmpty, maxRetries); + super(ethContext, metricsSystem, maxRetries); this.protocolSchedule = protocolSchedule; this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; this.count = count; @@ -92,27 +91,24 @@ protected CompletableFuture> executeTaskOnCurrentPeer( return executeSubTask(task::run) .thenApply( peerResult -> { - LOG.trace( - "Got {} block headers by hash {} from peer {} has result {}", + final var res = peerResult.getResult(); + LOG.debug( + "Get {} block headers by hash {} from peer {} has result {}", count, referenceHash, currentPeer, - peerResult.getResult()); - if (peerResult.getResult().isEmpty()) { - currentPeer.recordUselessResponse("GetHeadersFromPeerByHashTask"); - throw new IncompleteResultsException( - "No block headers for hash " - + referenceHash - + " returned by peer " - + currentPeer.getShortNodeId()); - } - result.complete(peerResult.getResult()); - return peerResult.getResult(); + res); + return res; }); } @Override - protected boolean isRetryableError(final Throwable error) { - return super.isRetryableError(error) || error instanceof IncompleteResultsException; + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final List peerResult) { + return !emptyResult(peerResult); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java index 16040e963ae..0b94d6bcb7d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTask.java @@ -17,18 +17,19 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.Collection; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.tuweni.bytes.Bytes; -public class RetryingGetNodeDataFromPeerTask extends AbstractRetryingPeerTask> { +public class RetryingGetNodeDataFromPeerTask + extends AbstractRetryingSwitchingPeerTask> { private final EthContext ethContext; private final Set hashes; @@ -39,8 +40,9 @@ private RetryingGetNodeDataFromPeerTask( final EthContext ethContext, final Collection hashes, final long pivotBlockNumber, - final MetricsSystem metricsSystem) { - super(ethContext, 4, data -> false, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + super(ethContext, metricsSystem, maxRetries); this.ethContext = ethContext; this.hashes = new HashSet<>(hashes); this.pivotBlockNumber = pivotBlockNumber; @@ -51,21 +53,27 @@ public static RetryingGetNodeDataFromPeerTask forHashes( final EthContext ethContext, final Collection hashes, final long pivotBlockNumber, - final MetricsSystem metricsSystem) { - return new RetryingGetNodeDataFromPeerTask(ethContext, hashes, pivotBlockNumber, metricsSystem); + final MetricsSystem metricsSystem, + final int maxRetries) { + return new RetryingGetNodeDataFromPeerTask( + ethContext, hashes, pivotBlockNumber, metricsSystem, maxRetries); } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { + protected CompletableFuture> executeTaskOnCurrentPeer(final EthPeer peer) { final GetNodeDataFromPeerTask task = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, pivotBlockNumber, metricsSystem); - assignedPeer.ifPresent(task::assignPeer); - return executeSubTask(task::run) - .thenApply( - peerResult -> { - result.complete(peerResult.getResult()); - return peerResult.getResult(); - }); + task.assignPeer(peer); + return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final Map peerResult) { + return false; + } + + @Override + protected boolean successfulResult(final Map peerResult) { + return !emptyResult(peerResult); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java index 26fc7b544ce..c3a946f46ef 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java @@ -43,7 +43,12 @@ public DownloadBodiesStep( @Override public CompletableFuture> apply(final List blockHeaders) { - return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem) + return CompleteBlocksTask.forHeaders( + protocolSchedule, + ethContext, + blockHeaders, + ethContext.getEthPeers().peerCount(), + metricsSystem) .run(); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java index c71a97a5f49..6d9550d06ca 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java @@ -16,7 +16,6 @@ import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask; import java.util.Comparator; @@ -84,15 +83,13 @@ protected CompletableFuture> requestBodies(final List b context.getEthContext().getEthPeers().peerCount(), blockHeaders); - final CompletableFuture>> run = - getBodiesFromPeerTask.run(); - return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult) - .thenApply( - blocks -> { - LOG.debug("Got {} blocks from peers", blocks.size()); - blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber())); - return blocks; - }); + final CompletableFuture> run = getBodiesFromPeerTask.run(); + return run.thenApply( + blocks -> { + LOG.debug("Got {} blocks from peers", blocks.size()); + blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber())); + return blocks; + }); } @VisibleForTesting diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java index 1253dbb17c1..dff6c703314 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/PivotBlockConfirmer.java @@ -183,7 +183,6 @@ private Optional createPivotQuery(final l .getEthPeers() .streamBestPeers() .filter(p -> p.chainState().getEstimatedHeight() >= blockNumber) - .filter(EthPeer::isFullyValidated) .filter(p -> !pivotBlockQueriesByPeerId.keySet().contains(p.nodeId())) .findFirst() .flatMap((peer) -> createGetHeaderTask(peer, blockNumber)); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java index e4325b967f5..31815da1c43 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/RequestDataStep.java @@ -46,7 +46,11 @@ public RequestDataStep(final EthContext ethContext, final MetricsSystem metricsS this( (hashes, pivotBlockNumber) -> RetryingGetNodeDataFromPeerTask.forHashes( - ethContext, hashes, pivotBlockNumber, metricsSystem)); + ethContext, + hashes, + pivotBlockNumber, + metricsSystem, + ethContext.getEthPeers().peerCount())); } RequestDataStep( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java index f78ffe3af9a..b75e12e26e9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/RequestDataStep.java @@ -86,7 +86,8 @@ public CompletableFuture> requestAccount( accountDataRequest.getStartKeyHash(), accountDataRequest.getEndKeyHash(), blockHeader, - metricsSystem); + metricsSystem, + ethContext.getEthPeers().peerCount()); downloadState.addOutstandingTask(getAccountTask); return getAccountTask .run() @@ -121,7 +122,13 @@ public CompletableFuture>> requestStorage( : RangeManager.MAX_RANGE; final EthTask getStorageRangeTask = RetryingGetStorageRangeFromPeerTask.forStorageRange( - ethContext, accountHashes, minRange, maxRange, blockHeader, metricsSystem); + ethContext, + accountHashes, + minRange, + maxRange, + blockHeader, + metricsSystem, + ethContext.getEthPeers().peerCount()); downloadState.addOutstandingTask(getStorageRangeTask); return getStorageRangeTask .run() @@ -156,7 +163,11 @@ public CompletableFuture>> requestCode( final BlockHeader blockHeader = fastSyncState.getPivotBlockHeader().get(); final EthTask> getByteCodeTask = RetryingGetBytecodeFromPeerTask.forByteCode( - ethContext, codeHashes, blockHeader, metricsSystem); + ethContext, + codeHashes, + blockHeader, + metricsSystem, + ethContext.getEthPeers().peerCount()); downloadState.addOutstandingTask(getByteCodeTask); return getByteCodeTask .run() @@ -195,7 +206,7 @@ public CompletableFuture>> requestTrieNodeByPath( }); final EthTask> getTrieNodeFromPeerTask = RetryingGetTrieNodeFromPeerTask.forTrieNodes( - ethContext, message, blockHeader, metricsSystem); + ethContext, message, blockHeader, metricsSystem, ethContext.getEthPeers().peerCount()); downloadState.addOutstandingTask(getTrieNodeFromPeerTask); return getTrieNodeFromPeerTask .run() diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java index 6e04833b644..ff0689e726c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -31,7 +31,6 @@ import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -66,7 +65,7 @@ private CompleteBlocksTask( final List headers, final int maxRetries, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Collection::isEmpty, metricsSystem); + super(ethContext, maxRetries, metricsSystem); checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; @@ -134,6 +133,16 @@ protected CompletableFuture> executePeerTask(final Optional return requestBodies(assignedPeer).thenCompose(this::processBodiesResult); } + @Override + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final List peerResult) { + return incompleteHeaders().isEmpty(); + } + private CompletableFuture> requestBodies(final Optional assignedPeer) { final List incompleteHeaders = incompleteHeaders(); if (incompleteHeaders.isEmpty()) { @@ -157,7 +166,7 @@ private CompletableFuture> processBodiesResult(final List blo blocksResult.forEach((block) -> blocks.put(block.getHeader().getNumber(), block)); if (incompleteHeaders().isEmpty()) { - result.complete( + return completedFuture( headers.stream().map(h -> blocks.get(h.getNumber())).collect(Collectors.toList())); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 23e26ff8922..3bc13c1325f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -39,7 +39,6 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -78,7 +77,7 @@ private DownloadHeaderSequenceTask( final int maxRetries, final ValidationPolicy validationPolicy, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Collection::isEmpty, metricsSystem); + super(ethContext, maxRetries, metricsSystem); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; @@ -139,19 +138,30 @@ protected CompletableFuture> executePeerTask( "Downloading headers from {} to {}.", startingBlockNumber, referenceHeader.getNumber()); final CompletableFuture> task = downloadHeaders(assignedPeer).thenCompose(this::processHeaders); - return task.whenComplete( - (r, t) -> { - // We're done if we've filled all requested headers - if (lastFilledHeaderIndex == 0) { + return task.thenApply( + r -> { + if (successfulResult(r)) { LOG.debug( "Finished downloading headers from {} to {}.", headers[0].getNumber(), headers[segmentLength - 1].getNumber()); - result.complete(Arrays.asList(headers)); + return Arrays.asList(headers); } + return r; }); } + @Override + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final List peerResult) { + // We're done if we've filled all requested headers + return lastFilledHeaderIndex == 0; + } + private CompletableFuture>> downloadHeaders( final Optional assignedPeer) { // Figure out parameters for our headers request diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java index 699bf9c4b17..2ef3a7bfb18 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java @@ -55,7 +55,7 @@ private GetReceiptsForHeadersTask( final List headers, final int maxRetries, final MetricsSystem metricsSystem) { - super(ethContext, maxRetries, Map::isEmpty, metricsSystem); + super(ethContext, maxRetries, metricsSystem); checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); this.ethContext = ethContext; this.metricsSystem = metricsSystem; @@ -92,6 +92,16 @@ protected CompletableFuture>> executeP return requestReceipts(assignedPeer).thenCompose(this::processResponse); } + @Override + protected boolean emptyResult(final Map> peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final Map> peerResult) { + return isComplete(); + } + private CompletableFuture>> requestReceipts( final Optional assignedPeer) { final List incompleteHeaders = incompleteHeaders(); @@ -115,8 +125,8 @@ private CompletableFuture>> processRes final Map> responseData) { receipts.putAll(responseData); - if (isComplete()) { - result.complete(receipts); + if (successfulResult(responseData)) { + return CompletableFuture.completedFuture(receipts); } return CompletableFuture.completedFuture(responseData); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java index c36599e6354..bd55e3951e1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByHashTask.java @@ -20,7 +20,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; -import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingSwitchingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; @@ -50,7 +49,7 @@ public class RetryingGetHeaderFromPeerByHashTask final long minimumRequiredBlockNumber, final MetricsSystem metricsSystem, final int maxRetries) { - super(ethContext, metricsSystem, List::isEmpty, maxRetries); + super(ethContext, metricsSystem, maxRetries); this.protocolSchedule = protocolSchedule; this.minimumRequiredBlockNumber = minimumRequiredBlockNumber; checkNotNull(referenceHash); @@ -90,24 +89,21 @@ protected CompletableFuture> executeTaskOnCurrentPeer(final Et referenceHash, peer, peerResult.getResult()); - if (peerResult.getResult().isEmpty()) { - throw new IncompleteResultsException( - "No block header for hash " - + referenceHash - + " returned by peer " - + peer.getShortNodeId()); - } - result.complete(peerResult.getResult()); return peerResult.getResult(); }); } + public CompletableFuture getHeader() { + return run().thenApply(singletonList -> singletonList.get(0)); + } + @Override - protected boolean isRetryableError(final Throwable error) { - return super.isRetryableError(error) || error instanceof IncompleteResultsException; + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); } - public CompletableFuture getHeader() { - return run().thenApply(singletonList -> singletonList.get(0)); + @Override + protected boolean successfulResult(final List peerResult) { + return !emptyResult(peerResult); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java index 676382c069e..e334cbddb73 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java @@ -18,12 +18,12 @@ import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractGetHeadersFromPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByNumberTask; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -41,7 +41,7 @@ private RetryingGetHeaderFromPeerByNumberTask( final MetricsSystem metricsSystem, final long pivotBlockNumber, final int maxRetries) { - super(ethContext, maxRetries, Collection::isEmpty, metricsSystem); + super(ethContext, maxRetries, metricsSystem); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; this.pivotBlockNumber = pivotBlockNumber; @@ -65,14 +65,17 @@ protected CompletableFuture> executePeerTask( GetHeadersFromPeerByNumberTask.forSingleNumber( protocolSchedule, ethContext, pivotBlockNumber, metricsSystem); assignedPeer.ifPresent(getHeadersTask::assignPeer); - return executeSubTask(getHeadersTask::run) - .thenApply( - peerResult -> { - if (!peerResult.getResult().isEmpty()) { - result.complete(peerResult.getResult()); - } - return peerResult.getResult(); - }); + return executeSubTask(getHeadersTask::run).thenApply(PeerTaskResult::getResult); + } + + @Override + protected boolean emptyResult(final List peerResult) { + return peerResult.isEmpty(); + } + + @Override + protected boolean successfulResult(final List peerResult) { + return !emptyResult(peerResult); } public CompletableFuture getHeader() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java index 85f29cf35b0..c3fdd200c3a 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java @@ -36,7 +36,7 @@ */ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest { protected static final int DEFAULT_MAX_RETRIES = 4; - protected int maxRetries; + private int maxRetries; @BeforeEach public void resetMaxRetries() { @@ -49,6 +49,17 @@ protected void assertResultMatchesExpectation( assertThat(response).isEqualTo(requestedData); } + @Override + protected EthTask createTask(final T requestedData) { + return createTask(requestedData, getMaxRetries()); + } + + protected abstract EthTask createTask(T requestedData, int maxRetries); + + protected int getMaxRetries() { + return maxRetries; + } + @Test public void failsWhenPeerReturnsPartialResultThenStops() { // Setup data to be requested and expected response @@ -79,7 +90,7 @@ public void failsWhenPeerReturnsPartialResultThenStops() { assertThat(future.isDone()).isFalse(); // Respond max times - 1 with no data - respondingPeer.respondTimes(emptyResponder, maxRetries - 1); + respondingPeer.respondTimes(emptyResponder, getMaxRetries() - 1); assertThat(future).isNotDone(); // Next retry should fail @@ -142,7 +153,7 @@ public void doesNotCompleteWhenPeersAreUnavailable() { // Setup data to be requested final T requestedData = generateDataToBeRequested(); - final EthTask task = createTask(requestedData); + final EthTask task = createTask(requestedData, getMaxRetries() + 1); final CompletableFuture future = task.run(); assertThat(future.isDone()).isFalse(); @@ -155,7 +166,8 @@ public void completesWhenPeersAreTemporarilyUnavailable() final T requestedData = generateDataToBeRequested(); // Execute task and wait for response - final EthTask task = createTask(requestedData); + // +2 is required for switching peers tasks where the max retries = number of peers + final EthTask task = createTask(requestedData, getMaxRetries() + 2); final CompletableFuture future = task.run(); assertThat(future.isDone()).isFalse(); @@ -182,7 +194,7 @@ public void completeWhenPeersTimeoutTemporarily() EthProtocolManagerTestUtil.createPeer(ethProtocolManager); final T requestedData = generateDataToBeRequested(); - final EthTask task = createTask(requestedData); + final EthTask task = createTask(requestedData, ethPeers.peerCount() + 1); final CompletableFuture future = task.run(); assertThat(future.isDone()).isFalse(); @@ -208,7 +220,7 @@ public void failsWhenPeersSendEmptyResponses() { assertThat(future.isDone()).isFalse(); // Respond max times - 1 - respondingPeer.respondTimes(responder, maxRetries - 1); + respondingPeer.respondTimes(responder, getMaxRetries() - 1); assertThat(future).isNotDone(); // Next retry should fail diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java index 084c730a6e3..2892de925c0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java @@ -17,7 +17,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; @@ -25,7 +24,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -37,13 +35,15 @@ * @param The type of data being requested from the network */ public abstract class RetryingSwitchingPeerMessageTaskTest extends RetryingMessageTaskTest { - protected Optional responsivePeer = Optional.empty(); @Override - protected void assertResultMatchesExpectation( - final T requestedData, final T response, final EthPeer respondingPeer) { - assertThat(response).isEqualTo(requestedData); - responsivePeer.ifPresent(rp -> assertThat(rp).isEqualByComparingTo(respondingPeer)); + protected EthTask createTask(final T requestedData) { + return createTask(requestedData, getMaxRetries()); + } + + @Override + protected int getMaxRetries() { + return ethPeers.peerCount(); } @Test @@ -70,8 +70,6 @@ public void completesWhenBestPeerEmptyAndSecondPeerIsResponsive() blockchain, protocolContext.getWorldStateArchive(), transactionPool), 2); - responsivePeer = Optional.of(secondPeer.getEthPeer()); - assertThat(future.isDone()).isTrue(); assertResultMatchesExpectation(requestedData, future.get(), secondPeer.getEthPeer()); } @@ -80,32 +78,26 @@ public void completesWhenBestPeerEmptyAndSecondPeerIsResponsive() public void completesWhenBestPeerTimeoutsAndSecondPeerIsResponsive() throws ExecutionException, InterruptedException { // Setup first unresponsive peer - final RespondingEthPeer firstPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); // Setup second responsive peer final RespondingEthPeer secondPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + // First peer timeouts + peerCountToTimeout.set(1); + // Execute task and wait for response final T requestedData = generateDataToBeRequested(); final EthTask task = createTask(requestedData); final CompletableFuture future = task.run(); - // First peer timeouts - peerCountToTimeout.set(1); - firstPeer.respondTimes( - RespondingEthPeer.blockchainResponder( - blockchain, protocolContext.getWorldStateArchive(), transactionPool), - 2); // Second peer is responsive secondPeer.respondTimes( RespondingEthPeer.blockchainResponder( blockchain, protocolContext.getWorldStateArchive(), transactionPool), 2); - responsivePeer = Optional.of(secondPeer.getEthPeer()); - assertThat(future.isDone()).isTrue(); assertResultMatchesExpectation(requestedData, future.get(), secondPeer.getEthPeer()); } @@ -113,26 +105,19 @@ public void completesWhenBestPeerTimeoutsAndSecondPeerIsResponsive() @Test public void failsWhenAllPeersFail() { // Setup first unresponsive peer - final RespondingEthPeer firstPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); // Setup second unresponsive peer - final RespondingEthPeer secondPeer = - EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + + // all peers timeout + peerCountToTimeout.set(2); // Execute task and wait for response final T requestedData = generateDataToBeRequested(); final EthTask task = createTask(requestedData); final CompletableFuture future = task.run(); - for (int i = 0; i < maxRetries && !future.isDone(); i++) { - // First peer is unresponsive - firstPeer.respondWhile(RespondingEthPeer.emptyResponder(), firstPeer::hasOutstandingRequests); - // Second peer is unresponsive - secondPeer.respondWhile( - RespondingEthPeer.emptyResponder(), secondPeer::hasOutstandingRequests); - } - assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isTrue(); assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); @@ -140,23 +125,74 @@ public void failsWhenAllPeersFail() { @Test public void disconnectAPeerWhenAllPeersTried() { - maxRetries = MAX_PEERS + 1; final int numPeers = MAX_PEERS; final List respondingPeers = new ArrayList<>(numPeers); for (int i = 0; i < numPeers; i++) { respondingPeers.add(EthProtocolManagerTestUtil.createPeer(ethProtocolManager, numPeers - i)); } + // all peers timeout + peerCountToTimeout.set(numPeers); + // Execute task and wait for response final T requestedData = generateDataToBeRequested(); - final EthTask task = createTask(requestedData); + final EthTask task = createTask(requestedData, MAX_PEERS + 1); task.run(); - respondingPeers.forEach( - respondingPeer -> - respondingPeer.respondWhile( - RespondingEthPeer.emptyResponder(), respondingPeer::hasOutstandingRequests)); - assertThat(respondingPeers.get(numPeers - 1).getEthPeer().isDisconnected()).isTrue(); } + + @Test + @Override + public void failsWhenPeersSendEmptyResponses() { + // Setup unresponsive peers + final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder(); + final RespondingEthPeer respondingPeer1 = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 2); + final RespondingEthPeer respondingPeer2 = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + + // Setup data to be requested + final T requestedData = generateDataToBeRequested(); + + // Setup and run task + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + assertThat(future.isDone()).isFalse(); + + // Respond max times - 1 + respondingPeer1.respondTimes(responder, getMaxRetries() - 1); + assertThat(future).isNotDone(); + + // Next retry should fail + respondingPeer2.respond(responder); + assertThat(future).isDone(); + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); + } + + @Test + @Override + public void completesWhenPeersAreTemporarilyUnavailable() + throws ExecutionException, InterruptedException { + // Setup data to be requested + final T requestedData = generateDataToBeRequested(); + + // Execute task and wait for response + final EthTask task = createTask(requestedData, 2); + final CompletableFuture future = task.run(); + + assertThat(future.isDone()).isFalse(); + + // Setup a peer + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder( + blockchain, protocolContext.getWorldStateArchive(), transactionPool); + final RespondingEthPeer respondingPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager); + respondingPeer.respondWhile(responder, () -> !future.isDone()); + + assertResultMatchesExpectation(requestedData, future.get(), respondingPeer.getEthPeer()); + } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTaskTest.java index e180420a4f5..33008dc36bf 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTaskTest.java @@ -23,7 +23,6 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -80,7 +79,7 @@ private class TaskThatFailsSometimes extends AbstractRetryingPeerTask { int failures = 0; protected TaskThatFailsSometimes(final int initialFailures, final int maxRetries) { - super(ethContext, maxRetries, Objects::isNull, metricsSystem); + super(ethContext, maxRetries, metricsSystem); this.initialFailures = initialFailures; } @@ -91,9 +90,18 @@ protected CompletableFuture executePeerTask(final Optional ass failures++; return CompletableFuture.completedFuture(null); } else { - result.complete(Boolean.TRUE); return CompletableFuture.completedFuture(Boolean.TRUE); } } + + @Override + protected boolean emptyResult(final Boolean peerResult) { + return peerResult == null; + } + + @Override + protected boolean successfulResult(final Boolean peerResult) { + return !emptyResult(peerResult); + } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java index 9996842bd2d..439872059a2 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java @@ -47,7 +47,8 @@ protected PeerTaskResult generateDataToBeRequested() { } @Override - protected RetryingGetBlockFromPeersTask createTask(final PeerTaskResult requestedData) { + protected RetryingGetBlockFromPeersTask createTask( + final PeerTaskResult requestedData, final int maxRetries) { return RetryingGetBlockFromPeersTask.create( protocolSchedule, ethContext, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTaskTest.java index 5ed94b143a5..5ffef51a35c 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetNodeDataFromPeerTaskTest.java @@ -21,7 +21,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; -import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; +import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingSwitchingPeerMessageTaskTest; import java.util.List; import java.util.Map; @@ -34,7 +34,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -public class RetryingGetNodeDataFromPeerTaskTest extends RetryingMessageTaskTest> { +public class RetryingGetNodeDataFromPeerTaskTest + extends RetryingSwitchingPeerMessageTaskTest> { @Override protected Map generateDataToBeRequested() { @@ -50,10 +51,11 @@ protected Map generateDataToBeRequested() { } @Override - protected EthTask> createTask(final Map requestedData) { + protected EthTask> createTask( + final Map requestedData, final int maxRetries) { final List hashes = Lists.newArrayList(requestedData.keySet()); return RetryingGetNodeDataFromPeerTask.forHashes( - ethContext, hashes, GENESIS_BLOCK_NUMBER, metricsSystem); + ethContext, hashes, GENESIS_BLOCK_NUMBER, metricsSystem, maxRetries); } @Test @@ -95,4 +97,9 @@ public void failsWhenPeerReturnsPartialResultThenStops() {} @Override @Disabled("Empty responses count as valid when requesting node data") public void failsWhenPeersSendEmptyResponses() {} + + @Test + @Override + @Disabled("Empty responses count as valid when requesting node data") + public void completesWhenBestPeerEmptyAndSecondPeerIsResponsive() {} } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java index e8049af736e..e61b5767932 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/CompleteBlocksTaskTest.java @@ -73,7 +73,7 @@ protected List generateDataToBeRequested(final int nbBlock) { } @Override - protected CompleteBlocksTask createTask(final List requestedData) { + protected CompleteBlocksTask createTask(final List requestedData, final int maxRetries) { final List headersToComplete = requestedData.stream().map(Block::getHeader).collect(Collectors.toList()); return CompleteBlocksTask.forHeaders( @@ -136,7 +136,7 @@ public void shouldCreateWithdrawalsAwareEmptyBlock_whenWithdrawalsAreEnabled() { mockProtocolSchedule, ethContext, List.of(header1, header2), - maxRetries, + getMaxRetries(), new NoOpMetricsSystem()); assertThat(task.run()).isCompletedWithValue(expectedBlocks); } @@ -193,7 +193,7 @@ public void shouldCompleteBlockThatOnlyContainsWithdrawals_whenWithdrawalsAreEna mockProtocolSchedule, ethContext, List.of(header1, header2, header3), - maxRetries, + getMaxRetries(), new NoOpMetricsSystem()); final CompletableFuture> runningTask = task.run(); @@ -220,7 +220,7 @@ public void shouldReduceTheBlockSegmentSizeAfterEachRetry() { final List requestedData = generateDataToBeRequested(10); - final CompleteBlocksTask task = createTask(requestedData); + final EthTask> task = createTask(requestedData); final CompletableFuture> future = task.run(); final List messageCollector = new ArrayList<>(4); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java index d7a36a5d8f4..fbfb66a5266 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/DownloadHeaderSequenceTaskTest.java @@ -54,7 +54,8 @@ protected List generateDataToBeRequested() { } @Override - protected EthTask> createTask(final List requestedData) { + protected EthTask> createTask( + final List requestedData, final int maxRetries) { final BlockHeader lastHeader = requestedData.get(requestedData.size() - 1); final BlockHeader referenceHeader = blockchain.getBlockHeader(lastHeader.getNumber() + 1).get(); return DownloadHeaderSequenceTask.endingAtHeader( @@ -82,7 +83,7 @@ public void failsWhenPeerReturnsOnlyReferenceHeader() { ethContext, referenceHeader, 10, - maxRetries, + getMaxRetries(), validationPolicy, metricsSystem); final CompletableFuture> future = task.run(); @@ -112,7 +113,7 @@ public void failsWhenPeerReturnsOnlySubsetOfHeaders() { ethContext, referenceHeader, 10, - maxRetries, + getMaxRetries(), validationPolicy, metricsSystem); final CompletableFuture> future = task.run(); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java index 2b28e8a5c98..26dba56e016 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java @@ -48,7 +48,7 @@ protected Map> generateDataToBeRequested() @Override protected EthTask>> createTask( - final Map> requestedData) { + final Map> requestedData, final int maxRetries) { final List headersToComplete = new ArrayList<>(requestedData.keySet()); return GetReceiptsForHeadersTask.forHeaders( ethContext, headersToComplete, maxRetries, metricsSystem); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java index 1174b53cff4..8ed80646476 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTaskTest.java @@ -36,7 +36,8 @@ protected List generateDataToBeRequested() { } @Override - protected EthTask> createTask(final List requestedData) { + protected EthTask> createTask( + final List requestedData, final int maxRetries) { return RetryingGetHeaderFromPeerByNumberTask.forSingleNumber( protocolSchedule, ethContext, metricsSystem, PIVOT_BLOCK_NUMBER, maxRetries); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java index 6447522e287..f03d9e22bf8 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java @@ -22,7 +22,7 @@ public interface PeerPrivileges { * If true, the given peer can connect or remain connected even if the max connection limit or the * maximum remote connection limit has been reached or exceeded. * - * @param peerId The peer id to be checked. + * @param peerId The id of the peer to be checked. * @return {@code true} if the peer should be allowed to connect regardless of connection limits. */ boolean canExceedConnectionLimits(final Bytes peerId);