Skip to content

Commit

Permalink
Check for snap server (hyperledger#6609)
Browse files Browse the repository at this point in the history
* EthPeer add isServingSnap to be able to make sure that we have enough snap servers connected when we are snap syncing

Signed-off-by: [email protected] <[email protected]>
Signed-off-by: Sally MacFarlane <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
Signed-off-by: Daniel Lehrner <[email protected]>
  • Loading branch information
2 people authored and daniellehrner committed Jul 16, 2024
1 parent 99e41c6 commit a8c4cb8
Show file tree
Hide file tree
Showing 42 changed files with 942 additions and 318 deletions.
10 changes: 6 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,14 @@ public Runner build() {
.map(nodePerms -> PeerPermissions.combine(nodePerms, defaultPeerPermissions))
.orElse(defaultPeerPermissions);

final EthPeers ethPeers = besuController.getEthPeers();

LOG.info("Detecting NAT service.");
final boolean fallbackEnabled = natMethod == NatMethod.AUTO || natMethodFallbackEnabled;
final NatService natService = new NatService(buildNatManager(natMethod), fallbackEnabled);
final NetworkBuilder inactiveNetwork = caps -> new NoopP2PNetwork();
final NetworkBuilder activeNetwork =
caps -> {
final EthPeers ethPeers = besuController.getEthPeers();
return DefaultP2PNetwork.builder()
.vertx(vertx)
.nodeKey(nodeKey)
Expand All @@ -709,8 +710,8 @@ public Runner build() {
.blockchain(context.getBlockchain())
.blockNumberForks(besuController.getGenesisConfigOptions().getForkBlockNumbers())
.timestampForks(besuController.getGenesisConfigOptions().getForkBlockTimestamps())
.allConnectionsSupplier(ethPeers::getAllConnections)
.allActiveConnectionsSupplier(ethPeers::getAllActiveConnections)
.allConnectionsSupplier(ethPeers::streamAllConnections)
.allActiveConnectionsSupplier(ethPeers::streamAllActiveConnections)
.maxPeers(ethPeers.getMaxPeers())
.build();
};
Expand All @@ -721,9 +722,10 @@ public Runner build() {
.subProtocols(subProtocols)
.network(p2pEnabled ? activeNetwork : inactiveNetwork)
.metricsSystem(metricsSystem)
.ethPeersShouldConnect(ethPeers::shouldTryToConnect)
.build();

besuController.getEthPeers().setRlpxAgent(networkRunner.getRlpxAgent());
ethPeers.setRlpxAgent(networkRunner.getRlpxAgent());

final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.SnapProtocol;
Expand Down Expand Up @@ -77,6 +76,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.config.NetworkingConfiguration;
Expand Down Expand Up @@ -604,6 +604,12 @@ public BesuController build() {
final int maxMessageSize = ethereumWireProtocolConfiguration.getMaxMessageSize();
final Supplier<ProtocolSpec> currentProtocolSpecSupplier =
() -> protocolSchedule.getByBlockHeader(blockchain.getChainHeadHeader());
final ForkIdManager forkIdManager =
new ForkIdManager(
blockchain,
genesisConfigOptions.getForkBlockNumbers(),
genesisConfigOptions.getForkBlockTimestamps(),
ethereumWireProtocolConfiguration.isLegacyEth64ForkIdEnabled());
final EthPeers ethPeers =
new EthPeers(
getSupportedProtocol(),
Expand All @@ -615,7 +621,9 @@ public BesuController build() {
nodeKey.getPublicKey().getEncodedBytes(),
maxPeers,
maxRemotelyInitiatedPeers,
randomPeerPriority);
randomPeerPriority,
syncConfig.getSyncMode(),
forkIdManager);

final EthMessages ethMessages = new EthMessages();
final EthMessages snapMessages = new EthMessages();
Expand Down Expand Up @@ -681,13 +689,14 @@ public BesuController build() {
ethMessages,
scheduler,
peerValidators,
Optional.empty());
Optional.empty(),
forkIdManager);

final PivotBlockSelector pivotBlockSelector =
createPivotSelector(
protocolSchedule, protocolContext, ethContext, syncState, metricsSystem, blockchain);

final Synchronizer synchronizer =
final DefaultSynchronizer synchronizer =
createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
Expand All @@ -697,6 +706,16 @@ public BesuController build() {
ethProtocolManager,
pivotBlockSelector);

ethPeers.setTrailingPeerRequirementsSupplier(synchronizer::calculateTrailingPeerRequirements);

if (SyncMode.isSnapSync(syncConfig.getSyncMode())
|| SyncMode.isCheckpointSync(syncConfig.getSyncMode())) {
synchronizer.subscribeInSync((b) -> ethPeers.snapServerPeersNeeded(!b));
ethPeers.snapServerPeersNeeded(true);
} else {
ethPeers.snapServerPeersNeeded(false);
}

protocolContext.setSynchronizer(Optional.of(synchronizer));

final Optional<SnapProtocolManager> maybeSnapProtocolManager =
Expand Down Expand Up @@ -809,7 +828,7 @@ private TrieLogPruner createTrieLogPruner(
* @param pivotBlockSelector the pivot block selector
* @return the synchronizer
*/
protected Synchronizer createSynchronizer(
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
Expand Down Expand Up @@ -1000,6 +1019,7 @@ protected String getSupportedProtocol() {
* @param scheduler the scheduler
* @param peerValidators the peer validators
* @param mergePeerFilter the merge peer filter
* @param forkIdManager the fork id manager
* @return the eth protocol manager
*/
protected EthProtocolManager createEthProtocolManager(
Expand All @@ -1012,7 +1032,8 @@ protected EthProtocolManager createEthProtocolManager(
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return new EthProtocolManager(
protocolContext.getBlockchain(),
networkId,
Expand All @@ -1026,8 +1047,7 @@ protected EthProtocolManager createEthProtocolManager(
mergePeerFilter,
synchronizerConfiguration,
scheduler,
genesisConfigOptions.getForkBlockNumbers(),
genesisConfigOptions.getForkBlockTimestamps());
forkIdManager);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
Expand Down Expand Up @@ -242,7 +243,8 @@ protected EthProtocolManager createEthProtocolManager(
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return besuControllerBuilderSchedule
.get(0L)
.createEthProtocolManager(
Expand All @@ -255,7 +257,8 @@ protected EthProtocolManager createEthProtocolManager(
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
mergePeerFilter,
forkIdManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
Expand Down Expand Up @@ -97,7 +98,8 @@ protected EthProtocolManager createEthProtocolManager(
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {

var mergeContext = protocolContext.getConsensusContext(MergeContext.class);

Expand Down Expand Up @@ -126,7 +128,8 @@ protected EthProtocolManager createEthProtocolManager(
ethMessages,
scheduler,
peerValidators,
filterToUse);
filterToUse,
forkIdManager);

return ethProtocolManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
Expand All @@ -49,6 +48,7 @@
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.forkid.ForkIdManager;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration;
Expand Down Expand Up @@ -156,7 +156,8 @@ protected EthProtocolManager createEthProtocolManager(
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
synchronizerConfiguration,
Expand All @@ -167,7 +168,8 @@ protected EthProtocolManager createEthProtocolManager(
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
mergePeerFilter,
forkIdManager);
}

@Override
Expand Down Expand Up @@ -212,7 +214,7 @@ protected PluginServiceFactory createAdditionalPluginServices(
}

@Override
protected Synchronizer createSynchronizer(
protected DefaultSynchronizer createSynchronizer(
final ProtocolSchedule protocolSchedule,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
Expand All @@ -222,15 +224,14 @@ protected Synchronizer createSynchronizer(
final PivotBlockSelector pivotBlockSelector) {

DefaultSynchronizer sync =
(DefaultSynchronizer)
super.createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
protocolContext,
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);
super.createSynchronizer(
protocolSchedule,
worldStateStorageCoordinator,
protocolContext,
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);

if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.hyperledger.besu.consensus.common.bft.network.PeerConnectionTracker;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.p2p.network.ProtocolManager;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Message;
Expand Down Expand Up @@ -108,11 +107,6 @@ public void handleNewConnection(final PeerConnection peerConnection) {
peers.add(peerConnection);
}

@Override
public boolean shouldConnect(final Peer peer, final boolean incoming) {
return false; // for now the EthProtocolManager takes care of this
}

@Override
public void handleDisconnect(
final PeerConnection peerConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public ForkIdManager(
checkNotNull(blockchain);
checkNotNull(blockNumberForks);
this.chainHeadSupplier = blockchain::getChainHeadHeader;
this.genesisHash = blockchain.getGenesisBlock().getHash();
try {
this.genesisHash = blockchain.getGenesisBlock().getHash();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.blockNumbersForkIds = new ArrayList<>();
this.timestampsForkIds = new ArrayList<>();
this.legacyEth64 = legacyEth64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
private final PeerReputation reputation = new PeerReputation();
private final Map<PeerValidator, Boolean> validationStatus = new ConcurrentHashMap<>();
private final Bytes id;
private boolean isServingSnap = false;

private static final Map<Integer, Integer> roundMessages;

Expand Down Expand Up @@ -393,6 +394,14 @@ public RequestManager.ResponseStream getSnapTrieNode(
requestManagers.get(SnapProtocol.NAME).get(SnapV1.GET_TRIE_NODES), getTrieNodes);
}

public void setIsServingSnap(final boolean isServingSnap) {
this.isServingSnap = isServingSnap;
}

public boolean isServingSnap() {
return isServingSnap;
}

private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
Expand Down Expand Up @@ -582,9 +591,9 @@ public String getProtocolName() {
}

/**
* Return A read-only snapshot of this peer's current {@code chainState} }
* Return A read-only snapshot of this peer's current {@code chainState}
*
* @return A read-only snapshot of this peer's current {@code chainState} }
* @return A read-only snapshot of this peer's current {@code chainState}
*/
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
Expand Down Expand Up @@ -629,14 +638,17 @@ public boolean hasSupportForMessage(final int messageCode) {
@Override
public String toString() {
return String.format(
"PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s",
"PeerId: %s %s, validated? %s, disconnected? %s, client: %s, %s, %s, isServingSnap %s, has height %s, connected for %s ms",
getLoggableId(),
reputation,
isFullyValidated(),
isDisconnected(),
connection.getPeerInfo().getClientId(),
connection,
connection.getPeer().getEnodeURLString());
connection.getPeer().getEnodeURLString(),
isServingSnap,
chainHeadState.getEstimatedHeight(),
System.currentTimeMillis() - connection.getInitiatedAt());
}

@Nonnull
Expand Down
Loading

0 comments on commit a8c4cb8

Please sign in to comment.