Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: do not expose state caches outside regen #5599

Merged
merged 1 commit into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export type StateCacheItem = {
reads: number;
/** Unix timestamp (ms) of the last read */
lastRead: number;
checkpointState: boolean;
};

export type LodestarNodePeer = NodePeer & {
Expand All @@ -88,8 +89,6 @@ export type Api = {
getBlockProcessorQueueItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: BlockProcessorQueueItem[]}}>>;
/** Dump a summary of the states in the StateContextCache */
getStateCacheItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: StateCacheItem[]}}>>;
/** Dump a summary of the states in the CheckpointStateCache */
getCheckpointStateCacheItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: StateCacheItem[]}}>>;
/** Dump peer gossip stats by peer */
getGossipPeerScoreStats(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: GossipPeerScoreStat[]}}>>;
/** Dump lodestar score stats by peer */
Expand Down Expand Up @@ -132,7 +131,6 @@ export const routesData: RoutesData<Api> = {
getRegenQueueItems: {url: "/eth/v1/lodestar/regen-queue-items", method: "GET"},
getBlockProcessorQueueItems: {url: "/eth/v1/lodestar/block-processor-queue-items", method: "GET"},
getStateCacheItems: {url: "/eth/v1/lodestar/state-cache-items", method: "GET"},
getCheckpointStateCacheItems: {url: "/eth/v1/lodestar/checkpoint-state-cache-items", method: "GET"},
getGossipPeerScoreStats: {url: "/eth/v1/lodestar/gossip-peer-score-stats", method: "GET"},
getLodestarPeerScoreStats: {url: "/eth/v1/lodestar/lodestar-peer-score-stats", method: "GET"},
runGC: {url: "/eth/v1/lodestar/gc", method: "POST"},
Expand All @@ -153,7 +151,6 @@ export type ReqTypes = {
getRegenQueueItems: ReqEmpty;
getBlockProcessorQueueItems: ReqEmpty;
getStateCacheItems: ReqEmpty;
getCheckpointStateCacheItems: ReqEmpty;
getGossipPeerScoreStats: ReqEmpty;
getLodestarPeerScoreStats: ReqEmpty;
runGC: ReqEmpty;
Expand Down Expand Up @@ -183,7 +180,6 @@ export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
getRegenQueueItems: reqEmpty,
getBlockProcessorQueueItems: reqEmpty,
getStateCacheItems: reqEmpty,
getCheckpointStateCacheItems: reqEmpty,
getGossipPeerScoreStats: reqEmpty,
getLodestarPeerScoreStats: reqEmpty,
runGC: reqEmpty,
Expand Down Expand Up @@ -222,7 +218,6 @@ export function getReturnTypes(): ReturnTypes<Api> {
getRegenQueueItems: jsonType("snake"),
getBlockProcessorQueueItems: jsonType("snake"),
getStateCacheItems: jsonType("snake"),
getCheckpointStateCacheItems: jsonType("snake"),
getGossipPeerScoreStats: jsonType("snake"),
getLodestarPeerScoreStats: jsonType("snake"),
getPeers: jsonType("snake"),
Expand Down
9 changes: 2 additions & 7 deletions packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ export function getLodestarApi({
},

async getStateCacheItems() {
return {data: (chain as BeaconChain)["stateCache"].dumpSummary()};
},

async getCheckpointStateCacheItems() {
return {data: (chain as BeaconChain)["checkpointStateCache"].dumpSummary()};
return {data: chain.regen.dumpCacheSummary()};
},

async getGossipPeerScoreStats() {
Expand All @@ -109,8 +105,7 @@ export function getLodestarApi({
},

async dropStateCache() {
chain.stateCache.clear();
chain.checkpointStateCache.clear();
chain.regen.dropCache();
},

async connectPeer(peerIdStr, multiaddrStrs) {
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Slot, Epoch} from "@lodestar/types";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {CheckpointStateCache} from "../stateCache/index.js";
import {IStateRegenerator} from "../regen/interface.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -26,7 +26,7 @@ export interface StatesArchiverOpts {
*/
export class StatesArchiver {
constructor(
private readonly checkpointStateCache: CheckpointStateCache,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
Expand Down Expand Up @@ -83,7 +83,7 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.checkpointStateCache.get(finalized);
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
}
Expand Down
20 changes: 10 additions & 10 deletions packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {CheckpointStateCache} from "../stateCache/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {IStateRegenerator} from "../regen/interface.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks, FinalizedData} from "./archiveBlocks.js";

Expand Down Expand Up @@ -51,7 +51,7 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics: Metrics | null
) {
this.statesArchiver = new StatesArchiver(chain.checkpointStateCache, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down Expand Up @@ -84,11 +84,11 @@ export class Archiver {

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.checkpointStateCache.prune(
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
);
this.chain.stateCache.prune(headStateRoot);
};

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
Expand All @@ -105,7 +105,7 @@ export class Archiver {
this.chain.clock.currentEpoch
);
this.collectFinalizedProposalStats(
this.chain.checkpointStateCache,
this.chain.regen,
this.chain.forkChoice,
this.chain.beaconProposerCache,
finalizedData,
Expand All @@ -117,8 +117,8 @@ export class Archiver {
// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);

this.chain.checkpointStateCache.pruneFinalized(finalizedEpoch);
this.chain.stateCache.deleteAllBeforeEpoch(finalizedEpoch);
this.chain.regen.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
this.chain.forkChoice.prune(finalized.rootHex);
await this.updateBackfillRange(finalized);
Expand Down Expand Up @@ -176,7 +176,7 @@ export class Archiver {
};

private collectFinalizedProposalStats(
checkpointStateCache: CheckpointStateCache,
regen: IStateRegenerator,
forkChoice: IForkChoice,
beaconProposerCache: IBeaconChain["beaconProposerCache"],
finalizedData: FinalizedData,
Expand Down Expand Up @@ -241,7 +241,7 @@ export class Archiver {
let finalizedAttachedValidatorsMissedCount = 0;

for (const checkpointHex of finalizedProposersCheckpoints) {
const checkpointState = checkpointStateCache.get(checkpointHex);
const checkpointState = regen.getCheckpointStateSync(checkpointHex);

// Generate stats for attached validators if we have state info
if (checkpointState !== null) {
Expand Down
21 changes: 3 additions & 18 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {isOptimisticBlock} from "../../util/forkChoice.js";
import {isQueueErrorAborted} from "../../util/queue/index.js";
import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
import type {BeaconChain} from "../chain.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
Expand Down Expand Up @@ -82,7 +81,7 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.stateCache.add(postState);
this.regen.addPostState(postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
Expand Down Expand Up @@ -198,21 +197,7 @@ export async function importBlock(

if (newHead.blockRoot !== oldHead.blockRoot) {
// Set head state as strong reference
const headState =
newHead.stateRoot === toHexString(postState.hashTreeRoot()) ? postState : this.stateCache.get(newHead.stateRoot);
if (headState) {
this.stateCache.setHeadState(headState);
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHead.stateRoot});
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
this.stateCache.setHeadState(null);
this.regen.getState(newHead.stateRoot, RegenCaller.processBlock).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
}
this.regen.updateHeadState(newHead.stateRoot, postState);

this.emitter.emit(routes.events.EventType.head, {
block: newHead.blockRoot,
Expand Down Expand Up @@ -337,7 +322,7 @@ export async function importBlock(
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.checkpointStateCache.add(cp, checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);

// Note: in-lined code from previos handler of ChainEvent.checkpoint
Expand Down
24 changes: 9 additions & 15 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ export class BeaconChain implements IBeaconChain {
readonly forkChoice: IForkChoice;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: QueuedStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;
Expand Down Expand Up @@ -260,6 +258,7 @@ export class BeaconChain implements IBeaconChain {
checkpointStateCache,
db,
metrics,
logger,
emitter,
signal,
});
Expand All @@ -274,8 +273,6 @@ export class BeaconChain implements IBeaconChain {
this.clock = clock;
this.regen = regen;
this.bls = bls;
this.checkpointStateCache = checkpointStateCache;
this.stateCache = stateCache;
this.emitter = emitter;
this.lightClientServer = lightClientServer;

Expand All @@ -298,8 +295,6 @@ export class BeaconChain implements IBeaconChain {

async close(): Promise<void> {
this.abortController.abort();
this.stateCache.clear();
this.checkpointStateCache.clear();
await this.bls.close();
}

Expand Down Expand Up @@ -344,8 +339,7 @@ export class BeaconChain implements IBeaconChain {
getHeadState(): CachedBeaconStateAllForks {
// head state should always exist
const head = this.forkChoice.getHead();
const headState =
this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
const headState = this.regen.getClosestHeadState(head);
if (!headState) {
throw Error(`headState does not exist for head root=${head.blockRoot} slot=${head.slot}`);
}
Expand Down Expand Up @@ -397,7 +391,7 @@ export class BeaconChain implements IBeaconChain {
return null;
}

const state = this.stateCache.get(block.stateRoot);
const state = this.regen.getStateSync(block.stateRoot);
return state && {state, executionOptimistic: isOptimisticBlock(block)};
}
} else {
Expand All @@ -424,7 +418,7 @@ export class BeaconChain implements IBeaconChain {
// - 1 every 100s of states that are persisted in the archive state

// TODO: This is very inneficient for debug requests of serialized content, since it deserializes to serialize again
const cachedStateCtx = this.stateCache.get(stateRoot);
const cachedStateCtx = this.regen.getStateSync(stateRoot);
if (cachedStateCtx) {
const block = this.forkChoice.getBlock(cachedStateCtx.latestBlockHeader.hashTreeRoot());
return {state: cachedStateCtx, executionOptimistic: block != null && isOptimisticBlock(block)};
Expand Down Expand Up @@ -679,7 +673,7 @@ export class BeaconChain implements IBeaconChain {
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): {state: CachedBeaconStateAllForks; stateId: string; shouldWarn: boolean} {
const state = this.checkpointStateCache.get(checkpoint);
const state = this.regen.getCheckpointStateSync(checkpoint);
if (state) {
return {state, stateId: "checkpoint_state", shouldWarn: false};
}
Expand All @@ -692,7 +686,7 @@ export class BeaconChain implements IBeaconChain {
// Find a state in the same branch of checkpoint at same epoch. Balances should exactly the same
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) === checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
const descendantBlockState = this.regen.getStateSync(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: descendantBlockState, stateId: "descendant_state_same_epoch", shouldWarn: true};
}
Expand All @@ -708,7 +702,7 @@ export class BeaconChain implements IBeaconChain {
// Note: must call .forwardIterateDescendants() again since nodes are not sorted
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) > checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
const descendantBlockState = this.regen.getStateSync(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: blockState, stateId: "descendant_state_latter_epoch", shouldWarn: true};
}
Expand Down Expand Up @@ -847,8 +841,8 @@ export class BeaconChain implements IBeaconChain {
this.seenBlockProposers.prune(computeStartSlotAtEpoch(cp.epoch));

// TODO: Improve using regen here
const headState = this.stateCache.get(this.forkChoice.getHead().stateRoot);
const finalizedState = this.checkpointStateCache.get(cp);
const headState = this.regen.getStateSync(this.forkChoice.getHead().stateRoot);
const finalizedState = this.regen.getCheckpointStateSync(cp);
if (headState) {
this.opPool.pruneAll(headState, finalizedState);
}
Expand Down
3 changes: 0 additions & 3 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {Metrics} from "../metrics/metrics.js";
import {IClock} from "../util/clock.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {StateContextCache, CheckpointStateCache} from "./stateCache/index.js";
import {IBlsVerifier} from "./bls/index.js";
import {
SeenAttesters,
Expand Down Expand Up @@ -80,8 +79,6 @@ export interface IBeaconChain {
readonly forkChoice: IForkChoice;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: IStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class PrepareNextSlotScheduler {
// + if next slot is a skipped slot, it'd help getting target checkpoint state faster to validate attestations
if (isEpochTransition) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "success"}, 1);
const previousHits = this.chain.checkpointStateCache.updatePreComputedCheckpoint(headRoot, nextEpoch);
const previousHits = this.chain.regen.updatePreComputedCheckpoint(headRoot, nextEpoch);
if (previousHits === 0) {
this.metrics?.precomputeNextEpochTransition.waste.inc();
}
Expand Down
21 changes: 19 additions & 2 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import {allForks, phase0, Slot, RootHex} from "@lodestar/types";
import {allForks, phase0, Slot, RootHex, Epoch} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {ProtoBlock} from "@lodestar/fork-choice";
import {CheckpointHex} from "../stateCache/index.js";

export enum RegenCaller {
getDuties = "getDuties",
Expand Down Expand Up @@ -27,10 +30,24 @@ export type StateCloneOpts = {
dontTransferCache: boolean;
};

export interface IStateRegenerator extends IStateRegeneratorInternal {
dropCache(): void;
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
}

/**
* Regenerates states that have already been processed by the fork choice
*/
export interface IStateRegenerator {
export interface IStateRegeneratorInternal {
/**
* Return a valid pre-state for a beacon block
* This will always return a state in the latest viable epoch
Expand Down
Loading