Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and fix retrying get block switching peer #4256

Merged
merged 11 commits into from
Aug 18, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
### Bug Fixes
- Fixes off-by-one error for mainnet TTD fallback [#4223](https://github.com/hyperledger/besu/pull/4223)
- Fix off-by-one error in AbstractRetryingPeerTask [#4254](https://github.com/hyperledger/besu/pull/4254)
- Refactor and fix retrying get block switching peer [#4256](https://github.com/hyperledger/besu/pull/4256)
- Fix encoding of key (short hex) in eth_getProof [#4261](https://github.com/hyperledger/besu/pull/4261)
- Fix for post-merge networks fast-sync [#4224](https://github.com/hyperledger/besu/pull/4224), [#4276](https://github.com/hyperledger/besu/pull/4276)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ protected void handleTaskError(final Throwable error) {
}

protected boolean isRetryableError(final Throwable error) {
final boolean isPeerError =
error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;
return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error));
}

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
protected boolean isPeerFailure(final Throwable error) {
return error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;
}

protected EthContext getEthContext() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright contributors to Hyperledger Besu
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.task;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetryingPeerTask<T> {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractRetryingSwitchingPeerTask.class);

private final Set<EthPeer> triedPeers = new HashSet<>();
private final Set<EthPeer> failedPeers = new HashSet<>();

protected AbstractRetryingSwitchingPeerTask(
final EthContext ethContext,
final MetricsSystem metricsSystem,
final Predicate<T> isEmptyResponse,
final int maxRetries) {
super(ethContext, maxRetries, isEmptyResponse, metricsSystem);
}

@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
}

protected abstract CompletableFuture<T> executeTaskOnCurrentPeer(final EthPeer peer);

@Override
protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedPeer) {

final Optional<EthPeer> maybePeer =
assignedPeer
.filter(u -> getRetryCount() == 1) // first try with the assigned peer if present
.map(Optional::of)
.orElseGet(this::selectNextPeer); // otherwise select a new one from the pool

if (maybePeer.isEmpty()) {
traceLambda(
LOG,
"No peer found to try to execute task at attempt {}, tried peers {}",
this::getRetryCount,
triedPeers::toString);
final var ex = new NoAvailablePeersException();
return CompletableFuture.failedFuture(ex);
}

final EthPeer peerToUse = maybePeer.get();
assignPeer(peerToUse);

traceLambda(
LOG,
"Trying to execute task on peer {}, attempt {}",
this::getAssignedPeer,
this::getRetryCount);

return executeTaskOnCurrentPeer(peerToUse)
.thenApply(
peerResult -> {
traceLambda(
LOG,
"Got result {} from peer {}, attempt {}",
peerResult::toString,
peerToUse::toString,
this::getRetryCount);
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
result.complete(peerResult);
return peerResult;
});
}

@Override
protected void handleTaskError(final Throwable error) {
if (isPeerFailure(error)) {
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer));
}
super.handleTaskError(error);
}

@Override
protected boolean isRetryableError(final Throwable error) {
return error instanceof TimeoutException || isPeerFailure(error);
}

private Optional<EthPeer> selectNextPeer() {
final Optional<EthPeer> maybeNextPeer = remainingPeersToTry().findFirst();

if (maybeNextPeer.isEmpty()) {
// tried all the peers, restart from the best one but excluding the failed ones
refreshPeers();
triedPeers.retainAll(failedPeers);
return remainingPeersToTry().findFirst();
}

return maybeNextPeer;
}

private Stream<EthPeer> remainingPeersToTry() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !peer.isDisconnected() && !triedPeers.contains(peer));
}

private void refreshPeers() {
final EthPeers peers = getEthContext().getEthPeers();
// If we are at max connections, then refresh peers disconnecting one of the failed peers,
// or the least useful
if (peers.peerCount() >= peers.getMaxPeers()) {
failedPeers.stream()
.filter(peer -> !peer.isDisconnected())
.findAny()
.or(() -> peers.streamAvailablePeers().sorted(peers.getBestChainComparator()).findFirst())
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
.ifPresent(
peer -> {
debugLambda(LOG, "Refresh peers disconnecting peer {}", peer::toString);
peer.disconnect(DisconnectReason.USELESS_PEER);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,135 +14,101 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.task;

import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetryingGetBlockFromPeersTask
extends AbstractRetryingPeerTask<AbstractPeerTask.PeerTaskResult<Block>> {
extends AbstractRetryingSwitchingPeerTask<AbstractPeerTask.PeerTaskResult<Block>> {

private static final Logger LOG = LoggerFactory.getLogger(RetryingGetBlockFromPeersTask.class);

private final ProtocolContext protocolContext;
private final ProtocolSchedule protocolSchedule;
private final Optional<Hash> blockHash;
private final Optional<Hash> maybeBlockHash;
private final long blockNumber;
private final Set<EthPeer> triedPeers = new HashSet<>();

protected RetryingGetBlockFromPeersTask(
final ProtocolContext protocolContext,
final EthContext ethContext,
final ProtocolSchedule protocolSchedule,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> blockHash,
final Optional<Hash> maybeBlockHash,
final long blockNumber) {
super(ethContext, maxRetries, Objects::isNull, metricsSystem);
this.protocolContext = protocolContext;
super(ethContext, metricsSystem, Objects::isNull, maxRetries);
this.protocolSchedule = protocolSchedule;
this.blockHash = blockHash;
this.maybeBlockHash = maybeBlockHash;
this.blockNumber = blockNumber;
}

public static RetryingGetBlockFromPeersTask create(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final MetricsSystem metricsSystem,
final int maxRetries,
final Optional<Hash> hash,
final Optional<Hash> maybeHash,
final long blockNumber) {
return new RetryingGetBlockFromPeersTask(
protocolContext,
ethContext,
protocolSchedule,
metricsSystem,
maxRetries,
hash,
blockNumber);
}

@Override
public void assignPeer(final EthPeer peer) {
super.assignPeer(peer);
triedPeers.add(peer);
ethContext, protocolSchedule, metricsSystem, maxRetries, maybeHash, blockNumber);
}

@Override
protected CompletableFuture<AbstractPeerTask.PeerTaskResult<Block>> executePeerTask(
final Optional<EthPeer> assignedPeer) {

protected CompletableFuture<PeerTaskResult<Block>> executeTaskOnCurrentPeer(
final EthPeer currentPeer) {
final GetBlockFromPeerTask getBlockTask =
GetBlockFromPeerTask.create(
protocolSchedule, getEthContext(), blockHash, blockNumber, getMetricsSystem());

getBlockTask.assignPeer(
assignedPeer
.filter(unused -> getRetryCount() == 1) // first try with the assigned preferred peer
.orElseGet( // then selecting a new one from the pool
() -> {
assignPeer(selectNextPeer());
return getAssignedPeer().get();
}));

LOG.debug(
"Getting block {} ({}) from peer {}, attempt {}",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
protocolSchedule, getEthContext(), maybeBlockHash, blockNumber, getMetricsSystem());
getBlockTask.assignPeer(currentPeer);

return executeSubTask(getBlockTask::run)
.thenApply(
peerResult -> {
debugLambda(
LOG,
"Got block {} from peer {}, attempt {}",
peerResult.getResult()::toLogString,
peerResult.getPeer()::toString,
this::getRetryCount);
result.complete(peerResult);
return peerResult;
});
}

private EthPeer selectNextPeer() {
return getEthContext()
.getEthPeers()
.streamBestPeers()
.filter(peer -> !triedPeers.contains(peer))
.findFirst()
.orElseThrow(NoAvailablePeersException::new);
}

@Override
protected boolean isRetryableError(final Throwable error) {
return (blockNumber > protocolContext.getBlockchain().getChainHeadBlockNumber())
&& (super.isRetryableError(error) || error instanceof IncompleteResultsException);
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
garyschulte marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void handleTaskError(final Throwable error) {
if (getRetryCount() < getMaxRetries()) {
LOG.debug(
"Failed to get block {} ({}) from peer {}, attempt {}, retrying later",
blockNumber,
blockHash,
getAssignedPeer(),
getRetryCount());
debugLambda(
LOG,
"Failed to get block {} from peer {}, attempt {}, retrying later",
this::logBlockNumberMaybeHash,
this::getAssignedPeer,
this::getRetryCount);
} else {
LOG.warn(
"Failed to get block {} ({}) after {} retries", blockNumber, blockHash, getRetryCount());
"Failed to get block {} after {} retries", logBlockNumberMaybeHash(), getRetryCount());
}
super.handleTaskError(error);
}

private String logBlockNumberMaybeHash() {
return blockNumber + maybeBlockHash.map(h -> " (" + h.toHexString() + ")").orElse("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ private CompletableFuture<Block> getBlockFromPeers(
final Optional<Hash> blockHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
protocolContext,
protocolSchedule,
ethContext,
metricsSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public CompletableFuture<Void> executeAsync(final Hash hash) {
private CompletableFuture<Block> requestBlock(final Hash targetHash) {
final RetryingGetBlockFromPeersTask getBlockTask =
RetryingGetBlockFromPeersTask.create(
context.getProtocolContext(),
context.getProtocolSchedule(),
context.getEthContext(),
context.getMetricsSystem(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* @param <R> The type of data returned from the network
*/
public abstract class AbstractMessageTaskTest<T, R> {
protected static final int MAX_PEERS = 5;
protected static Blockchain blockchain;
protected static ProtocolSchedule protocolSchedule;
protected static ProtocolContext protocolContext;
Expand All @@ -77,7 +78,6 @@ public static void setup() {
blockchain = blockchainSetupUtil.getBlockchain();
protocolSchedule = blockchainSetupUtil.getProtocolSchedule();
protocolContext = blockchainSetupUtil.getProtocolContext();

assertThat(blockchainSetupUtil.getMaxBlockNumber()).isGreaterThanOrEqualTo(20L);
}

Expand All @@ -91,7 +91,7 @@ public void setupTest() {
EthProtocol.NAME,
TestClock.fixed(),
metricsSystem,
25,
MAX_PEERS,
EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE));
final EthMessages ethMessages = new EthMessages();
final EthScheduler ethScheduler =
Expand Down
Loading