Skip to content

Commit

Permalink
add snapserver experimental flag, refactor snap server start
Browse files Browse the repository at this point in the history
Signed-off-by: garyschulte <[email protected]>
  • Loading branch information
garyschulte committed Mar 15, 2024
1 parent 10320a0 commit 6fb37f2
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 98 deletions.
2 changes: 2 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,8 @@ private String generateConfigurationOverview() {
getDataStorageConfiguration().getUnstable().getBonsaiTrieLogPruningWindowSize());
}

builder.setSnapServerEnabled(this.unstableSynchronizerOptions.isSnapsyncServerEnabled());

builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation());
builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ConfigurationOverviewBuilder {
private boolean isBonsaiLimitTrieLogsEnabled = false;
private long trieLogRetentionLimit = 0;
private Integer trieLogsPruningWindowSize = null;
private boolean isSnapServerEnabled = false;
private TransactionPoolConfiguration.Implementation txPoolImplementation;
private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode;
private Map<String, String> environment;
Expand Down Expand Up @@ -219,6 +220,11 @@ public ConfigurationOverviewBuilder setTrieLogRetentionLimit(final long limit) {
return this;
}

public ConfigurationOverviewBuilder setSnapServerEnabled(final boolean snapServerEnabled) {
isSnapServerEnabled = snapServerEnabled;
return this;
}

/**
* Sets trie logs pruning window size
*
Expand Down Expand Up @@ -339,6 +345,10 @@ public String build() {

lines.add("Using " + worldStateUpdateMode + " worldstate update mode");

if (isSnapServerEnabled) {
lines.add("Experimental Snap Sync server enabled");
}

if (isBonsaiLimitTrieLogsEnabled) {
final StringBuilder trieLogPruningString = new StringBuilder();
trieLogPruningString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class SynchronizerOptions implements CLIOptions<SynchronizerConfiguration
private static final String SNAP_FLAT_DB_HEALING_ENABLED_FLAG =
"--Xsnapsync-synchronizer-flat-db-healing-enabled";

private static final String SNAP_SERVER_ENABLED_FLAG = "--Xsnapsync-server-enabled";

private static final String CHECKPOINT_POST_MERGE_FLAG = "--Xcheckpoint-post-merge-enabled";

/**
Expand Down Expand Up @@ -296,6 +298,13 @@ public void parseBlockPropagationRange(final String arg) {
private Boolean snapsyncFlatDbHealingEnabled =
SnapSyncConfiguration.DEFAULT_IS_FLAT_DB_HEALING_ENABLED;

@CommandLine.Option(
names = SNAP_SERVER_ENABLED_FLAG,
hidden = true,
paramLabel = "<Boolean>",
description = "Snap sync server enabled (default: ${DEFAULT-VALUE})")
private Boolean snapsyncServerEnabled = SnapSyncConfiguration.DEFAULT_SNAP_SERVER_ENABLED;

@CommandLine.Option(
names = {CHECKPOINT_POST_MERGE_FLAG},
hidden = true,
Expand All @@ -314,6 +323,15 @@ public boolean isSnapsyncFlatDbHealingEnabled() {
return snapsyncFlatDbHealingEnabled;
}

/**
* Flag to know whether the Snap sync server feature is enabled or disabled.
*
* @return true is snap sync server is enabled
*/
public boolean isSnapsyncServerEnabled() {
return snapsyncServerEnabled;
}

/**
* Create synchronizer options.
*
Expand Down Expand Up @@ -456,7 +474,9 @@ public List<String> getCLIOptions() {
SNAP_FLAT_ACCOUNT_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatAccountHealedCountPerRequest),
SNAP_FLAT_STORAGE_HEALED_COUNT_PER_REQUEST_FLAG,
OptionParser.format(snapsyncFlatStorageHealedCountPerRequest)));
OptionParser.format(snapsyncFlatStorageHealedCountPerRequest),
SNAP_SERVER_ENABLED_FLAG,
OptionParser.format(snapsyncServerEnabled)));
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,17 @@ private Optional<SnapProtocolManager> createSnapProtocolManager(
final List<PeerValidator> peerValidators,
final EthPeers ethPeers,
final EthMessages snapMessages) {
return Optional.of(
new SnapProtocolManager(
worldStateStorageCoordinator, peerValidators, ethPeers, snapMessages, protocolContext));
if (syncConfig.getSnapSyncConfiguration().isSnapServerEnabled()) {
return Optional.of(
new SnapProtocolManager(
worldStateStorageCoordinator,
peerValidators,
ethPeers,
snapMessages,
protocolContext));
} else {
return Optional.empty();
}
}

WorldStateArchive createWorldStateArchive(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public abstract Optional<Bytes> getFlatStorageValueByStorageSlotKey(
StorageSlotKey storageSlotKey,
SegmentedKeyValueStorage storageStorage);

public boolean isCodeByCodeHash() {
return codeStorageStrategy instanceof CodeHashCodeStorageStrategy;
}

/*
* Retrieves the code data for the given code hash and account hash.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ FlatDbMode deriveFlatDbStrategy(final SegmentedKeyValueStorage composedWorldStat
return flatDbMode;
}

private boolean deriveUseCodeStorageByHash(
protected boolean deriveUseCodeStorageByHash(
final SegmentedKeyValueStorage composedWorldStateStorage) {
final boolean configCodeUsingHash =
dataStorageConfiguration.getUnstable().getBonsaiCodeStoredByCodeHashEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ public interface DataStorageConfiguration {
.unstable(Unstable.DEFAULT)
.build();

DataStorageConfiguration BONSAI_CODE_BY_HASH_CONFIG =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(DataStorageFormat.BONSAI)
.bonsaiMaxLayersToLoad(DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD)
.unstable(Unstable.CODE_BY_CODE_HASH)
.build();

DataStorageConfiguration DEFAULT_BONSAI_CONFIG =
ImmutableDataStorageConfiguration.builder()
.dataStorageFormat(DataStorageFormat.BONSAI)
Expand Down Expand Up @@ -73,11 +66,6 @@ interface Unstable {
DataStorageConfiguration.Unstable DEFAULT =
ImmutableDataStorageConfiguration.Unstable.builder().build();

DataStorageConfiguration.Unstable CODE_BY_CODE_HASH =
ImmutableDataStorageConfiguration.Unstable.builder()
.bonsaiCodeStoredByCodeHashEnabled(true)
.build();

@Value.Default
default boolean getBonsaiLimitTrieLogsEnabled() {
return DEFAULT_BONSAI_LIMIT_TRIE_LOGS_ENABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public Optional<Bytes> getAccountStorageTrieNode(
forest -> forest.getAccountStorageTrieNode(nodeHash));
}

public Optional<Bytes> getCode(final Hash codeHash, final Hash accountHash) {
return applyForStrategy(
bonsai -> bonsai.getCode(codeHash, accountHash), forest -> forest.getCode(codeHash));
}

@SuppressWarnings("unchecked")
public <STRATEGY extends WorldStateKeyValueStorage> STRATEGY getStrategy(
final Class<STRATEGY> strategyClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage;
Expand All @@ -33,11 +32,11 @@
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.trie.CompactEncoding;
import org.hyperledger.besu.ethereum.trie.bonsai.BonsaiWorldStateProvider;
import org.hyperledger.besu.ethereum.trie.bonsai.cache.CachedWorldStorageManager;
import org.hyperledger.besu.ethereum.trie.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.worldstate.FlatDbMode;
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -84,43 +83,22 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener {
private final AtomicLong listenerId = new AtomicLong();
private final EthMessages snapMessages;

// provide worldstate storage by root hash
private final Function<Optional<Hash>, Optional<BonsaiWorldStateKeyValueStorage>>
worldStateStorageProvider;
private final WorldStateStorageCoordinator worldStateStorageCoordinator;
private final Optional<ProtocolContext> protocolContext;

// provide worldstate storage by root hash
private Function<Optional<Hash>, Optional<BonsaiWorldStateKeyValueStorage>>
worldStateStorageProvider = __ -> Optional.empty();

SnapServer(
final EthMessages snapMessages,
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext) {
this(
snapMessages,
worldStateStorageCoordinator,
rootHash ->
((BonsaiWorldStateProvider) protocolContext.getWorldStateArchive())
.getCachedWorldStorageManager()
.getStorageByRootHash(rootHash));

Optional.of(protocolContext.getWorldStateArchive())
.filter(__ -> worldStateStorageCoordinator.isMatchingFlatMode(FlatDbMode.FULL))
.map(BonsaiWorldStateProvider.class::cast)
.ifPresent(
flatArchive -> {
var cachedStorageManager = flatArchive.getCachedWorldStorageManager();
var blockchain = protocolContext.getBlockchain();

// prime state-root-to-blockhash cache
primeWorldStateArchive(cachedStorageManager, blockchain);

// subscribe to initial sync completed events to start/stop snap server:
protocolContext
.getSynchronizer()
.filter(z -> z instanceof DefaultSynchronizer)
.map(DefaultSynchronizer.class::cast)
.ifPresentOrElse(
z -> this.listenerId.set(z.subscribeInitialSync(this)),
() -> LOGGER.warn("SnapServer created without reference to sync status"));
});

this.snapMessages = snapMessages;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.protocolContext = Optional.of(protocolContext);
registerResponseConstructors();
}

/**
Expand All @@ -136,7 +114,7 @@ class SnapServer implements BesuEvents.InitialSyncCompletionListener {
this.snapMessages = snapMessages;
this.worldStateStorageCoordinator = worldStateStorageCoordinator;
this.worldStateStorageProvider = worldStateStorageProvider;
registerResponseConstructors();
this.protocolContext = Optional.empty();
}

@Override
Expand All @@ -149,22 +127,60 @@ public void onInitialSyncRestart() {
stop();
}

public SnapServer start() {
isStarted.set(true);
public synchronized SnapServer start() {

// if we are bonsai and full flat, we can provide a worldstate storage:
var worldStateKeyValueStorage = worldStateStorageCoordinator.worldStateKeyValueStorage();
if (worldStateKeyValueStorage.getDataStorageFormat().equals(DataStorageFormat.BONSAI)
&& worldStateStorageCoordinator.isMatchingFlatMode(FlatDbMode.FULL)) {
LOGGER.debug("Starting snap server with Bonsai full flat db");
var bonsaiArchive =
protocolContext
.map(ProtocolContext::getWorldStateArchive)
.map(BonsaiWorldStateProvider.class::cast);
var cachedStorageManagerOpt =
bonsaiArchive.map(archive -> archive.getCachedWorldStorageManager());

if (cachedStorageManagerOpt.isPresent()) {
var cachedStorageManager = cachedStorageManagerOpt.get();
this.worldStateStorageProvider = cachedStorageManager::getStorageByRootHash;

// when we start we need to build the cache of latest 128 worldstates trielogs-to-root-hash:
var blockchain = protocolContext.map(ProtocolContext::getBlockchain).orElse(null);

// at startup, prime the latest worldstates by roothash:
cachedStorageManager.primeRootToBlockHashCache(blockchain, PRIME_STATE_ROOT_CACHE_LIMIT);

// subscribe to initial sync completed events to start/stop snap server:
protocolContext
.flatMap(ProtocolContext::getSynchronizer)
.filter(z -> z instanceof DefaultSynchronizer)
.map(DefaultSynchronizer.class::cast)
.ifPresentOrElse(
z -> this.listenerId.set(z.subscribeInitialSync(this)),
() -> LOGGER.warn("SnapServer created without reference to sync status"));

var flatDbStrategy =
((BonsaiWorldStateKeyValueStorage)
worldStateStorageCoordinator.worldStateKeyValueStorage())
.getFlatDbStrategy();
if (!flatDbStrategy.isCodeByCodeHash()) {
LOGGER.warn("SnapServer requires code stored by codehash, but it is not enabled");
}
} else {
LOGGER.warn(
"SnapServer started without cached storage manager, this should only happen in tests");
}
isStarted.set(true);
}
return this;
}

public SnapServer stop() {
public synchronized SnapServer stop() {
isStarted.set(false);
return this;
}

private void primeWorldStateArchive(
final CachedWorldStorageManager storageManager, final Blockchain blockchain) {
// at startup, prime the latest worldstates by roothash:
storageManager.primeRootToBlockHashCache(blockchain, PRIME_STATE_ROOT_CACHE_LIMIT);
}

private void registerResponseConstructors() {
snapMessages.registerResponseConstructor(
SnapV1.GET_ACCOUNT_RANGE, messageData -> constructGetAccountRangeResponse(messageData));
Expand Down Expand Up @@ -410,40 +426,25 @@ MessageData constructGetBytecodesResponse(final MessageData message) {
.addArgument(codeHashes.hashes()::size)
.log();

// there is no worldstate root or block header for us to use, so default to head. This
// can cause problems for self-destructed contracts pre-shanghai. for now since this impl
// is deferring to #5889, we can just get any flat code storage and know we are not deleting
// code for now.
try {
return worldStateStorageProvider
.apply(Optional.empty())
.map(
storage -> {
LOGGER.trace("obtained worldstate in {}", stopWatch);
List<Bytes> codeBytes = new ArrayDeque<>();
for (Bytes32 codeHash : codeHashes.hashes()) {
Optional<Bytes> optCode = storage.getCode(Hash.wrap(codeHash), null);
if (optCode.isPresent()) {
if (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes) {
break;
}
codeBytes.add(optCode.get());
}
}
var resp = ByteCodesMessage.create(codeBytes);
LOGGER.debug(
"returned in {} code bytes message with {} entries, resp size {} of max {}",
stopWatch,
codeBytes.size(),
resp.getSize(),
maxResponseBytes);
return resp;
})
.orElseGet(
() -> {
LOGGER.debug("returned empty byte codes message due to missing worldstate");
return EMPTY_BYTE_CODES_MESSAGE;
});
List<Bytes> codeBytes = new ArrayDeque<>();
for (Bytes32 codeHash : codeHashes.hashes()) {
Optional<Bytes> optCode = worldStateStorageCoordinator.getCode(Hash.wrap(codeHash), null);
if (optCode.isPresent()) {
if (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes) {
break;
}
codeBytes.add(optCode.get());
}
}
var resp = ByteCodesMessage.create(codeBytes);
LOGGER.debug(
"returned in {} code bytes message with {} entries, resp size {} of max {}",
stopWatch,
codeBytes.size(),
resp.getSize(),
maxResponseBytes);
return resp;
} catch (Exception ex) {
LOGGER.error("Unexpected exception serving bytecodes request", ex);
return EMPTY_BYTE_CODES_MESSAGE;
Expand Down
Loading

0 comments on commit 6fb37f2

Please sign in to comment.