Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: regen to use different state caches based on nHistoricalStates flag #6359

Closed
wants to merge 12 commits into from
Closed
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -83,13 +84,26 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
// starting from Jan 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
await this.db.stateArchive.put(finalizedState.slot, finalizedState);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down
33 changes: 29 additions & 4 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {BufferPool} from "../util/bufferPool.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts, CommonBlockBody} from "./interface.js";
Expand Down Expand Up @@ -79,7 +80,11 @@ import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {SeenGossipBlockInput} from "./seenCache/index.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {InMemoryCheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {FIFOBlockStateCache} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCache} from "./stateCache/persistentCheckpointsCache.js";
import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -237,9 +242,28 @@ export class BeaconChain implements IBeaconChain {
this.pubkey2index = cachedState.epochCtx.pubkey2index;
this.index2pubkey = cachedState.epochCtx.index2pubkey;

const stateCache = new StateContextCache({metrics});
const checkpointStateCache = new CheckpointStateCache({metrics});

const fileDataStore = opts.nHistoricalStatesFileDataStore ?? false;
const stateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new StateContextCache({metrics});
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
getHeadState: this.getHeadState.bind(this),
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
: // production option
new DbCPStateDatastore(this.db),
},
this.opts
)
: new InMemoryCheckpointStateCache({metrics});
const {checkpoint} = computeAnchorCheckpoint(config, anchorState);
stateCache.add(cachedState);
stateCache.setHeadState(cachedState);
Expand Down Expand Up @@ -333,6 +357,7 @@ export class BeaconChain implements IBeaconChain {

/** Populate in-memory caches with persisted data. Call at least once on startup */
async loadFromDisk(): Promise<void> {
await this.regen.init();
await this.opPool.fromPersisted(this.db);
}

Expand Down
13 changes: 10 additions & 3 deletions packages/beacon-node/src/chain/forkChoice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
ForkChoiceStore,
ExecutionStatus,
JustifiedBalancesGetter,
ForkChoiceOpts,
ForkChoiceOpts as RealForkChoiceOpts,
} from "@lodestar/fork-choice";
import {
CachedBeaconStateAllForks,
Expand All @@ -21,7 +21,10 @@ import {ChainEventEmitter} from "../emitter.js";
import {ChainEvent} from "../emitter.js";
import {GENESIS_SLOT} from "../../constants/index.js";

export type {ForkChoiceOpts};
export type ForkChoiceOpts = RealForkChoiceOpts & {
// for testing only
forkchoiceConstructor?: typeof ForkChoice;
};

/**
* Fork Choice extended with a ChainEventEmitter
Expand All @@ -47,7 +50,11 @@ export function initializeForkChoice(

const justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state);

return new ForkChoice(
// forkchoiceConstructor is only used for some test cases
// production code use ForkChoice constructor directly
const forkchoiceConstructor = opts.forkchoiceConstructor ?? ForkChoice;

return new forkchoiceConstructor(
config,

new ForkChoiceStore(
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";
import {DEFAULT_MAX_BLOCK_STATES, FIFOBlockStateCacheOpts} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCacheOpts} from "./stateCache/persistentCheckpointsCache.js";
import {DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY} from "./stateCache/persistentCheckpointsCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
FIFOBlockStateCacheOpts &
PersistentCheckpointStateCacheOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
Expand All @@ -30,6 +35,8 @@ export type IChainOptions = BlockProcessOpts &
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
nHistoricalStates?: boolean;
nHistoricalStatesFileDataStore?: boolean;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -102,4 +109,8 @@ export const defaultChainOptions: IChainOptions = {
// batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
nHistoricalStates: false,
nHistoricalStatesFileDataStore: false,
maxBlockStates: DEFAULT_MAX_BLOCK_STATES,
maxCPStateEpochsInMemory: DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY,
};
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum RegenErrorCode {
TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED",
BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB",
STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR",
INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT",
}

export type RegenErrorType =
Expand All @@ -17,7 +18,8 @@ export type RegenErrorType =
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}
| {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex};

export class RegenError extends Error {
type: RegenErrorType;
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
dropCache(): void;
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
Expand Down
16 changes: 14 additions & 2 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
logger: Logger;
};

type RegenRequestKey = keyof IStateRegeneratorInternal;
Expand Down Expand Up @@ -54,6 +53,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.logger = modules.logger;
}

async init(): Promise<void> {
if (this.checkpointStateCache.init) {
return this.checkpointStateCache.init();
}
}

canAcceptWork(): boolean {
return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD;
}
Expand All @@ -71,6 +76,10 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.stateCache.get(stateRoot);
}

async getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
return this.checkpointStateCache.getStateOrBytes(cp);
}

getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.get(cp);
}
Expand Down Expand Up @@ -111,10 +120,13 @@ export class QueuedStateRegenerator implements IStateRegenerator {
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot});
// it's important to reload state to regen head state here
const shouldReload = true;
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// for legacy StateContextCache only
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock).then(
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, shouldReload).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
Expand Down
Loading
Loading