Skip to content

Commit

Permalink
Batching backward sync (hyperledger#3532)
Browse files Browse the repository at this point in the history
* Backward sync now batches requests

Signed-off-by: Jiri Peinlich <[email protected]>
  • Loading branch information
gezero authored and garyschulte committed May 2, 2022
1 parent 43e2a1c commit 15557be
Show file tree
Hide file tree
Showing 30 changed files with 1,553 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardsSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -62,8 +63,15 @@ protected MiningCoordinator createMiningCoordinator(
protocolSchedule,
transactionPool.getPendingTransactions(),
miningParameters,
new BackwardsSyncContext(
protocolContext, protocolSchedule, metricsSystem, ethProtocolManager.ethContext()));
new BackwardSyncContext(
protocolContext,
protocolSchedule,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
new BackwardSyncLookupService(
protocolSchedule, ethProtocolManager.ethContext(), metricsSystem, protocolContext),
storageProvider));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda;

import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.datatypes.Address;
Expand All @@ -31,7 +32,7 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardsSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.AbstractGasLimitSpecification;
import org.hyperledger.besu.ethereum.mainnet.HeaderValidationMode;
Expand Down Expand Up @@ -59,20 +60,20 @@ public class MergeCoordinator implements MergeMiningCoordinator {
final AtomicReference<Bytes> extraData = new AtomicReference<>(Bytes.fromHexString("0x"));
private final MergeContext mergeContext;
private final ProtocolContext protocolContext;
private final BackwardsSyncContext backwardsSyncContext;
private final BackwardSyncContext backwardSyncContext;
private final ProtocolSchedule protocolSchedule;

public MergeCoordinator(
final ProtocolContext protocolContext,
final ProtocolSchedule protocolSchedule,
final AbstractPendingTransactionsSorter pendingTransactions,
final MiningParameters miningParams,
final BackwardsSyncContext backwardsSyncContext) {
final BackwardSyncContext backwardSyncContext) {
this.protocolContext = protocolContext;
this.protocolSchedule = protocolSchedule;
this.mergeContext = protocolContext.getConsensusContext(MergeContext.class);
this.miningParameters = miningParams;
this.backwardsSyncContext = backwardsSyncContext;
this.backwardSyncContext = backwardSyncContext;
this.targetGasLimit =
miningParameters
.getTargetGasLimit()
Expand Down Expand Up @@ -213,8 +214,8 @@ public Optional<BlockHeader> getOrSyncHeaderByHash(final Hash blockhash) {
if (optHeader.isPresent()) {
debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString());
} else {
debugLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString);
backwardsSyncContext.syncBackwardsUntil(blockhash);
infoLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString);
backwardSyncContext.syncBackwardsUntil(blockhash);
}
return optHeader;
}
Expand All @@ -228,7 +229,7 @@ public Result executeBlock(final Block block) {
.ifPresentOrElse(
blockHeader ->
debugLambda(LOG, "Parent of block {} is already present", block::toLogString),
() -> backwardsSyncContext.syncBackwardsUntil(block));
() -> backwardSyncContext.syncBackwardsUntil(block));

final var validationResult =
protocolSchedule
Expand Down Expand Up @@ -411,7 +412,7 @@ public Optional<Hash> getLatestValidAncestor(final BlockHeader blockHeader) {

@Override
public boolean isBackwardSyncing() {
return backwardsSyncContext.isSyncing();
return backwardSyncContext.isSyncing();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardsSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.feemarket.BaseFeeMarket;
Expand All @@ -65,7 +65,7 @@ public class MergeCoordinatorTest implements MergeGenesisConfigHelper {

@Mock AbstractPendingTransactionsSorter mockSorter;
@Mock MergeContext mergeContext;
@Mock BackwardsSyncContext backwardsSyncContext;
@Mock BackwardSyncContext backwardSyncContext;

private MergeCoordinator coordinator;
private ProtocolContext protocolContext;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void setUp() {
mockProtocolSchedule,
mockSorter,
new MiningParameters.Builder().coinbase(coinbase).build(),
backwardsSyncContext);
backwardSyncContext);
}

@Test
Expand Down Expand Up @@ -341,7 +341,7 @@ public void assertGetOrSyncForBlockNotPresent() {
var res = coordinator.getOrSyncHeaderByHash(mockHeader.getHash());

assertThat(res).isNotPresent();
verify(backwardsSyncContext, times(1)).syncBackwardsUntil(mockHeader.getHash());
verify(backwardSyncContext, times(1)).syncBackwardsUntil(mockHeader.getHash());
}

@Test
Expand Down Expand Up @@ -394,7 +394,7 @@ public void assertMergeAtGenesisSatisifiesTerminalPoW() {
mockProtocolSchedule,
mockSorter,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardsSyncContext.class));
mock(BackwardSyncContext.class));

var blockZero = mockHeaderBuilder.number(0L).buildHeader();
var blockOne = mockHeaderBuilder.number(1L).parentHash(blockZero.getHash()).buildHeader();
Expand Down Expand Up @@ -510,7 +510,7 @@ MergeCoordinator terminalAncestorMock(final long chainDepth, final boolean hasTe
mockProtocolSchedule,
mockSorter,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardsSyncContext.class)));
mock(BackwardSyncContext.class)));

return mockCoordinator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardsSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.AbstractPendingTransactionsSorter;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void setUp() {
mockProtocolSchedule,
mockSorter,
new MiningParameters.Builder().coinbase(coinbase).build(),
mock(BackwardsSyncContext.class));
mock(BackwardSyncContext.class));
mergeContext.setIsPostMerge(genesisState.getBlock().getHeader().getDifficulty());
blockchain.observeBlockAdded(
blockAddedEvent ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier {
TRIE_BRANCH_STORAGE(new byte[] {9}, new int[] {2}),
TRIE_LOG_STORAGE(new byte[] {10}, new int[] {2}),
GOQUORUM_PRIVATE_WORLD_STATE(new byte[] {11}),
GOQUORUM_PRIVATE_STORAGE(new byte[] {12});
BACKWARD_SYNC_HEADERS(new byte[] {13}),
BACKWARD_SYNC_BLOCKS(new byte[] {14});

private final byte[] id;
private final int[] versionList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public WorldStatePreimageStorage createPrivateWorldStatePreimageStorage() {
@Override
public GoQuorumPrivateStorage createGoQuorumPrivateStorage() {
return new GoQuorumPrivateKeyValueStorage(
getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.GOQUORUM_PRIVATE_STORAGE));
getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.BACKWARD_SYNC_HEADERS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.manager.task;

import static com.google.common.base.Preconditions.checkNotNull;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import com.google.common.annotations.VisibleForTesting;

public class RetryingGetHeadersEndingAtFromPeerByHashTask
extends AbstractRetryingPeerTask<List<BlockHeader>> {

private final Hash referenceHash;
private final ProtocolSchedule protocolSchedule;
private final long minimumRequiredBlockNumber;
private final int count;

@VisibleForTesting
RetryingGetHeadersEndingAtFromPeerByHashTask(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final int count,
final MetricsSystem metricsSystem) {
super(ethContext, 3, List::isEmpty, metricsSystem);
this.protocolSchedule = protocolSchedule;
this.minimumRequiredBlockNumber = minimumRequiredBlockNumber;
this.count = count;
checkNotNull(referenceHash);
this.referenceHash = referenceHash;
}

public static RetryingGetHeadersEndingAtFromPeerByHashTask endingAtHash(
final ProtocolSchedule protocolSchedule,
final EthContext ethContext,
final Hash referenceHash,
final long minimumRequiredBlockNumber,
final int count,
final MetricsSystem metricsSystem) {
return new RetryingGetHeadersEndingAtFromPeerByHashTask(
protocolSchedule,
ethContext,
referenceHash,
minimumRequiredBlockNumber,
count,
metricsSystem);
}

@Override
protected CompletableFuture<List<BlockHeader>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
final AbstractGetHeadersFromPeerTask task =
GetHeadersFromPeerByHashTask.endingAtHash(
protocolSchedule,
getEthContext(),
referenceHash,
minimumRequiredBlockNumber,
count,
getMetricsSystem());
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,7 @@ public CompletableFuture<Void> start() {
} else {
future = startFullSync();
}
future =
future.thenApply(
unused -> {
blockPropagationManager.stop();
running.set(false);
return null;
});
future = future.thenApply(this::finalizeSync);
return future;
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
Expand Down Expand Up @@ -191,18 +185,7 @@ private CompletableFuture<Void> handleFastSyncResult(final FastSyncState result)

private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
return fullSyncDownloader
.start()
.thenCompose(
unused -> {
maybePruner.ifPresent(Pruner::stop);
return null;
})
.thenApply(
o -> {
maybePruner.ifPresent(Pruner::stop);
return null;
});
return fullSyncDownloader.start();
}

@Override
Expand Down Expand Up @@ -238,4 +221,13 @@ public long subscribeInSync(final InSyncListener listener, final long syncTolera
public boolean unsubscribeInSync(final long listenerId) {
return syncState.unsubscribeSyncStatus(listenerId);
}

private Void finalizeSync(final Void unused) {
LOG.info("Stopping block propagation.");
blockPropagationManager.stop();
LOG.info("Stopping the pruner.");
maybePruner.ifPresent(Pruner::stop);
running.set(false);
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private CompletionStage<Void> repeatUnlessDownloadComplete(
&& !syncState.hasReachedTerminalDifficulty().orElse(false)) {
return performDownload();
} else {
LOG.info("Chain download complete");
LOG.info("PipelineChain download complete");
return completedFuture(null);
}
}
Expand Down
Loading

0 comments on commit 15557be

Please sign in to comment.