Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TrieLogPruner preload with 30 second timeout #7365

Merged
merged 12 commits into from
Jul 26, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand All @@ -40,6 +46,7 @@
public class TrieLogPruner implements TrieLogEvent.TrieLogObserver {

private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);
private static final int PRELOAD_TIMEOUT_IN_SECONDS = 30;

private final int pruningLimit;
private final int loadingLimit;
Expand Down Expand Up @@ -83,38 +90,82 @@ public TrieLogPruner(
BesuMetricCategory.PRUNER, "trie_log_pruned_orphan", "trie log pruned orphan");
}

public int initialize() {
return preloadQueue();
public void initialize() {
preloadQueueWithTimeout();
}

private int preloadQueue() {
private void preloadQueueWithTimeout() {

LOG.atInfo()
.setMessage("Loading first {} trie logs from database...")
.setMessage("Attempting to load first {} trie logs from database...")
.addArgument(loadingLimit)
.log();

try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) {

final AtomicBoolean timeoutOccurred = new AtomicBoolean(false);
final Runnable timeoutTask =
() -> {
timeoutOccurred.set(true);
LOG.atWarn()
.setMessage(
"Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
};

final ScheduledFuture<?> timeoutFuture =
preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
LOG.atInfo()
.setMessage(
"Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs")
.addArgument(PRELOAD_TIMEOUT_IN_SECONDS)
.log();

preloadQueue(timeoutOccurred, timeoutFuture);
}
}

private void preloadQueue(
final AtomicBoolean timeoutOccurred, final ScheduledFuture<?> timeoutFuture) {

try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to start streaming at a random key? There is a method streamFromKey() in class KeyValueStorage that allows you to pass in the starting key.

final AtomicLong count = new AtomicLong();

final AtomicLong addToPruneQueueCount = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.forEach(
blockHashAsBytes -> {
if (timeoutOccurred.get()) {
throw new RuntimeException(
new TimeoutException("Timeout occurred while preloading trie log prune queue"));
}
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
if (header.isPresent()) {
addToPruneQueue(header.get().getNumber(), blockHash);
count.getAndIncrement();
addToPruneQueueCount.getAndIncrement();
} else {
// prune orphaned blocks (sometimes created during block production)
rootWorldStateStorage.pruneTrieLog(blockHash);
orphansPruned.getAndIncrement();
prunedOrphanCounter.inc();
}
});

timeoutFuture.cancel(true);
LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
LOG.atInfo().log("Loaded {} trie logs from database", count);
return pruneFromQueue() + orphansPruned.intValue();
LOG.atInfo().log(
"Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...",
addToPruneQueueCount.intValue());
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Pruned {} trie logs.", prunedCount);
} catch (Exception e) {
LOG.error("Error loading trie logs from database, nothing pruned", e);
return 0;
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount);
macfarla marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOG.error("Error loading trie logs from database, nothing pruned", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface DataStorageConfiguration {
long DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD = 512;
boolean DEFAULT_BONSAI_LIMIT_TRIE_LOGS_ENABLED = true;
long MINIMUM_BONSAI_TRIE_LOG_RETENTION_LIMIT = DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD;
int DEFAULT_BONSAI_TRIE_LOG_PRUNING_WINDOW_SIZE = 30_000;
int DEFAULT_BONSAI_TRIE_LOG_PRUNING_WINDOW_SIZE = 5_000;
boolean DEFAULT_RECEIPT_COMPACTION_ENABLED = false;

DataStorageConfiguration DEFAULT_CONFIG =
Expand Down
Loading