Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve regen state #7033

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 58 additions & 12 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {fromHexString} from "@chainsafe/ssz";
import {phase0, Slot, RootHex, BeaconBlock} from "@lodestar/types";
import {phase0, Slot, RootHex, BeaconBlock, SignedBeaconBlock} from "@lodestar/types";
import {
CachedBeaconStateAllForks,
computeEpochAtSlot,
Expand All @@ -8,6 +8,7 @@ import {
DataAvailableStatus,
processSlots,
stateTransition,
StateHashTreeRootSource,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Logger, toRootHex} from "@lodestar/utils";
Expand Down Expand Up @@ -145,7 +146,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
caller: RegenCaller,
opts?: StateCloneOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
Expand All @@ -156,6 +157,13 @@ export class StateRegenerator implements IStateRegeneratorInternal {
return cachedStateCtx;
}

// in block gossip validation (getPreState() call), dontTransferCache is specified as true because we only want to transfer cache in verifyBlocksStateTransitionOnly()
// but here we want to process blocks as fast as possible so force to transfer cache in this case
if (opts && allowDiskReload) {
// if there is no `opts` specified, it already means "false"
opts.dontTransferCache = false;
}

// Otherwise we have to use the fork choice to traverse backwards, block by block,
// searching the state caches
// then replay blocks forward to the desired stateRoot
Expand All @@ -166,6 +174,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const blocksToReplay = [block];
let state: CachedBeaconStateAllForks | null = null;
const {checkpointStateCache} = this.modules;

const getSeedStateTimer = this.modules.metrics?.regenGetState.getSeedState.startTimer({caller});
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.blockStateCache.get(b.stateRoot, opts);
Expand All @@ -181,26 +191,58 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}
blocksToReplay.push(b);
}
getSeedStateTimer?.();

if (state === null) {
throw new RegenError({
code: RegenErrorCode.NO_SEED_STATE,
});
}

const blockCount = blocksToReplay.length;
const MAX_EPOCH_TO_PROCESS = 5;
if (blocksToReplay.length > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
if (blockCount > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
throw new RegenError({
code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED,
stateRoot,
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
this.modules.metrics?.regenGetState.blockCount.observe({caller}, blockCount);

const replaySlots = new Array<Slot>(blockCount);
const blockPromises = new Array<Promise<SignedBeaconBlock | null>>(blockCount);

const protoBlocksAsc = blocksToReplay.reverse();
for (const [i, protoBlock] of protoBlocksAsc.entries()) {
replaySlots[i] = protoBlock.slot;
blockPromises[i] = this.modules.db.block.get(fromHexString(protoBlock.blockRoot));
}

const logCtx = {stateRoot, replaySlots: replaySlots.join(",")};
this.modules.logger.debug("Replaying blocks to get state", logCtx);

const loadBlocksTimer = this.modules.metrics?.regenGetState.loadBlocks.startTimer({caller});
const blockOrNulls = await Promise.all(blockPromises);
loadBlocksTimer?.();

const blocksByRoot = new Map<RootHex, SignedBeaconBlock>();
for (const [i, blockOrNull] of blockOrNulls.entries()) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
// checking early here helps prevent unneccessary state transition below
if (blockOrNull === null) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: protoBlocksAsc[i].blockRoot,
});
}
blocksByRoot.set(protoBlocksAsc[i].blockRoot, blockOrNull);
}

const stateTransitionTimer = this.modules.metrics?.regenGetState.stateTransition.startTimer({caller});
for (const b of protoBlocksAsc) {
const block = blocksByRoot.get(b.blockRoot);
// just to make compiler happy, we checked in the above for loop already
if (block === undefined) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: b.blockRoot,
Expand All @@ -224,7 +266,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
this.modules.metrics
);

const hashTreeRootTimer = this.modules.metrics?.stateHashTreeRootTime.startTimer({
source: StateHashTreeRootSource.regenState,
});
const stateRoot = toRootHex(state.hashTreeRoot());
hashTreeRootTimer?.();

if (b.stateRoot !== stateRoot) {
throw new RegenError({
slot: b.slot,
Expand All @@ -238,17 +285,16 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// also with allowDiskReload flag, we "reload" it to the state cache too
this.modules.blockStateCache.add(state);
}

// this avoids keeping our node busy processing blocks
await nextEventLoop();
twoeths marked this conversation as resolved.
Show resolved Hide resolved
} catch (e) {
throw new RegenError({
code: RegenErrorCode.STATE_TRANSITION_ERROR,
error: e as Error,
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});
stateTransitionTimer?.();

this.modules.logger.debug("Replayed blocks to get state", {...logCtx, stateSlot: state.slot});

return state;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
newCachedState.commit();
const stateRoot = toRootHex(newCachedState.hashTreeRoot());
timer?.();

// load all cache in order for consumers (usually regen.getState()) to process blocks faster
newCachedState.validators.getAllReadonlyValues();
newCachedState.balances.getAll();
this.logger.debug("Reload: cached state load successful", {
...logMeta,
stateSlot: newCachedState.slot,
Expand Down
28 changes: 28 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,34 @@ export function createLodestarMetrics(
help: "UnhandledPromiseRejection total count",
}),

// regen.getState metrics
regenGetState: {
blockCount: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_block_count",
help: "Block count in regen.getState",
labelNames: ["caller"],
buckets: [4, 8, 16, 32, 64],
}),
getSeedState: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_get_seed_state_seconds",
help: "Duration of get seed state in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
loadBlocks: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_load_blocks_seconds",
help: "Duration of load blocks in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
stateTransition: register.histogram<{caller: RegenCaller}>({
name: "lodestar_regen_get_state_state_transition_seconds",
help: "Duration of state transition in regen.getState",
labelNames: ["caller"],
buckets: [0.1, 0.5, 1, 2, 3, 4],
}),
},

// Precompute next epoch transition
precomputeNextEpochTransition: {
count: register.counter<{result: string}>({
Expand Down
1 change: 1 addition & 0 deletions packages/state-transition/src/stateTransition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export enum StateHashTreeRootSource {
blockTransition = "block_transition",
prepareNextSlot = "prepare_next_slot",
prepareNextEpoch = "prepare_next_epoch",
regenState = "regen_state",
computeNewStateRoot = "compute_new_state_root",
}

Expand Down
Loading