Skip to content

Commit

Permalink
Merge 205fc94 into e2e5417
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jun 1, 2023
2 parents e2e5417 + 205fc94 commit 8cc2785
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 124 deletions.
7 changes: 1 addition & 6 deletions packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export type StateCacheItem = {
reads: number;
/** Unix timestamp (ms) of the last read */
lastRead: number;
checkpointState: boolean;
};

export type LodestarNodePeer = NodePeer & {
Expand All @@ -88,8 +89,6 @@ export type Api = {
getBlockProcessorQueueItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: BlockProcessorQueueItem[]}}>>;
/** Dump a summary of the states in the StateContextCache */
getStateCacheItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: StateCacheItem[]}}>>;
/** Dump a summary of the states in the CheckpointStateCache */
getCheckpointStateCacheItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: StateCacheItem[]}}>>;
/** Dump peer gossip stats by peer */
getGossipPeerScoreStats(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: GossipPeerScoreStat[]}}>>;
/** Dump lodestar score stats by peer */
Expand Down Expand Up @@ -132,7 +131,6 @@ export const routesData: RoutesData<Api> = {
getRegenQueueItems: {url: "/eth/v1/lodestar/regen-queue-items", method: "GET"},
getBlockProcessorQueueItems: {url: "/eth/v1/lodestar/block-processor-queue-items", method: "GET"},
getStateCacheItems: {url: "/eth/v1/lodestar/state-cache-items", method: "GET"},
getCheckpointStateCacheItems: {url: "/eth/v1/lodestar/checkpoint-state-cache-items", method: "GET"},
getGossipPeerScoreStats: {url: "/eth/v1/lodestar/gossip-peer-score-stats", method: "GET"},
getLodestarPeerScoreStats: {url: "/eth/v1/lodestar/lodestar-peer-score-stats", method: "GET"},
runGC: {url: "/eth/v1/lodestar/gc", method: "POST"},
Expand All @@ -153,7 +151,6 @@ export type ReqTypes = {
getRegenQueueItems: ReqEmpty;
getBlockProcessorQueueItems: ReqEmpty;
getStateCacheItems: ReqEmpty;
getCheckpointStateCacheItems: ReqEmpty;
getGossipPeerScoreStats: ReqEmpty;
getLodestarPeerScoreStats: ReqEmpty;
runGC: ReqEmpty;
Expand Down Expand Up @@ -183,7 +180,6 @@ export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
getRegenQueueItems: reqEmpty,
getBlockProcessorQueueItems: reqEmpty,
getStateCacheItems: reqEmpty,
getCheckpointStateCacheItems: reqEmpty,
getGossipPeerScoreStats: reqEmpty,
getLodestarPeerScoreStats: reqEmpty,
runGC: reqEmpty,
Expand Down Expand Up @@ -222,7 +218,6 @@ export function getReturnTypes(): ReturnTypes<Api> {
getRegenQueueItems: jsonType("snake"),
getBlockProcessorQueueItems: jsonType("snake"),
getStateCacheItems: jsonType("snake"),
getCheckpointStateCacheItems: jsonType("snake"),
getGossipPeerScoreStats: jsonType("snake"),
getLodestarPeerScoreStats: jsonType("snake"),
getPeers: jsonType("snake"),
Expand Down
9 changes: 2 additions & 7 deletions packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ export function getLodestarApi({
},

async getStateCacheItems() {
return {data: (chain as BeaconChain)["stateCache"].dumpSummary()};
},

async getCheckpointStateCacheItems() {
return {data: (chain as BeaconChain)["checkpointStateCache"].dumpSummary()};
return {data: chain.regen.dumpCacheSummary()};
},

async getGossipPeerScoreStats() {
Expand All @@ -109,8 +105,7 @@ export function getLodestarApi({
},

async dropStateCache() {
chain.stateCache.clear();
chain.checkpointStateCache.clear();
chain.regen.dropCache();
},

async connectPeer(peerIdStr, multiaddrStrs) {
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Slot, Epoch} from "@lodestar/types";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {CheckpointStateCache} from "../stateCache/index.js";
import {IStateRegenerator} from "../regen/interface.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -26,7 +26,7 @@ export interface StatesArchiverOpts {
*/
export class StatesArchiver {
constructor(
private readonly checkpointStateCache: CheckpointStateCache,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
Expand Down Expand Up @@ -83,7 +83,7 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.checkpointStateCache.get(finalized);
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
}
Expand Down
20 changes: 10 additions & 10 deletions packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {CheckpointStateCache} from "../stateCache/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {IStateRegenerator} from "../regen/interface.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks, FinalizedData} from "./archiveBlocks.js";

Expand Down Expand Up @@ -51,7 +51,7 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics: Metrics | null
) {
this.statesArchiver = new StatesArchiver(chain.checkpointStateCache, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down Expand Up @@ -84,11 +84,11 @@ export class Archiver {

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.checkpointStateCache.prune(
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
);
this.chain.stateCache.prune(headStateRoot);
};

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
Expand All @@ -105,7 +105,7 @@ export class Archiver {
this.chain.clock.currentEpoch
);
this.collectFinalizedProposalStats(
this.chain.checkpointStateCache,
this.chain.regen,
this.chain.forkChoice,
this.chain.beaconProposerCache,
finalizedData,
Expand All @@ -117,8 +117,8 @@ export class Archiver {
// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);

this.chain.checkpointStateCache.pruneFinalized(finalizedEpoch);
this.chain.stateCache.deleteAllBeforeEpoch(finalizedEpoch);
this.chain.regen.pruneOnFinalized(finalizedEpoch);

// tasks rely on extended fork choice
this.chain.forkChoice.prune(finalized.rootHex);
await this.updateBackfillRange(finalized);
Expand Down Expand Up @@ -176,7 +176,7 @@ export class Archiver {
};

private collectFinalizedProposalStats(
checkpointStateCache: CheckpointStateCache,
regen: IStateRegenerator,
forkChoice: IForkChoice,
beaconProposerCache: IBeaconChain["beaconProposerCache"],
finalizedData: FinalizedData,
Expand Down Expand Up @@ -241,7 +241,7 @@ export class Archiver {
let finalizedAttachedValidatorsMissedCount = 0;

for (const checkpointHex of finalizedProposersCheckpoints) {
const checkpointState = checkpointStateCache.get(checkpointHex);
const checkpointState = regen.getCheckpointStateSync(checkpointHex);

// Generate stats for attached validators if we have state info
if (checkpointState !== null) {
Expand Down
21 changes: 3 additions & 18 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {isOptimisticBlock} from "../../util/forkChoice.js";
import {isQueueErrorAborted} from "../../util/queue/index.js";
import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
import type {BeaconChain} from "../chain.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
Expand Down Expand Up @@ -82,7 +81,7 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.stateCache.add(postState);
this.regen.addPostState(postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
Expand Down Expand Up @@ -198,21 +197,7 @@ export async function importBlock(

if (newHead.blockRoot !== oldHead.blockRoot) {
// Set head state as strong reference
const headState =
newHead.stateRoot === toHexString(postState.hashTreeRoot()) ? postState : this.stateCache.get(newHead.stateRoot);
if (headState) {
this.stateCache.setHeadState(headState);
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHead.stateRoot});
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
this.stateCache.setHeadState(null);
this.regen.getState(newHead.stateRoot, RegenCaller.processBlock).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
}
this.regen.updateHeadState(newHead.stateRoot, postState);

this.emitter.emit(routes.events.EventType.head, {
block: newHead.blockRoot,
Expand Down Expand Up @@ -337,7 +322,7 @@ export async function importBlock(
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.checkpointStateCache.add(cp, checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);

// Note: in-lined code from previos handler of ChainEvent.checkpoint
Expand Down
24 changes: 9 additions & 15 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ export class BeaconChain implements IBeaconChain {
readonly forkChoice: IForkChoice;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: QueuedStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;
Expand Down Expand Up @@ -260,6 +258,7 @@ export class BeaconChain implements IBeaconChain {
checkpointStateCache,
db,
metrics,
logger,
emitter,
signal,
});
Expand All @@ -274,8 +273,6 @@ export class BeaconChain implements IBeaconChain {
this.clock = clock;
this.regen = regen;
this.bls = bls;
this.checkpointStateCache = checkpointStateCache;
this.stateCache = stateCache;
this.emitter = emitter;
this.lightClientServer = lightClientServer;

Expand All @@ -298,8 +295,6 @@ export class BeaconChain implements IBeaconChain {

async close(): Promise<void> {
this.abortController.abort();
this.stateCache.clear();
this.checkpointStateCache.clear();
await this.bls.close();
}

Expand Down Expand Up @@ -344,8 +339,7 @@ export class BeaconChain implements IBeaconChain {
getHeadState(): CachedBeaconStateAllForks {
// head state should always exist
const head = this.forkChoice.getHead();
const headState =
this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
const headState = this.regen.getClosestHeadState(head);
if (!headState) {
throw Error(`headState does not exist for head root=${head.blockRoot} slot=${head.slot}`);
}
Expand Down Expand Up @@ -397,7 +391,7 @@ export class BeaconChain implements IBeaconChain {
return null;
}

const state = this.stateCache.get(block.stateRoot);
const state = this.regen.getStateSync(block.stateRoot);
return state && {state, executionOptimistic: isOptimisticBlock(block)};
}
} else {
Expand All @@ -424,7 +418,7 @@ export class BeaconChain implements IBeaconChain {
// - 1 every 100s of states that are persisted in the archive state

// TODO: This is very inneficient for debug requests of serialized content, since it deserializes to serialize again
const cachedStateCtx = this.stateCache.get(stateRoot);
const cachedStateCtx = this.regen.getStateSync(stateRoot);
if (cachedStateCtx) {
const block = this.forkChoice.getBlock(cachedStateCtx.latestBlockHeader.hashTreeRoot());
return {state: cachedStateCtx, executionOptimistic: block != null && isOptimisticBlock(block)};
Expand Down Expand Up @@ -679,7 +673,7 @@ export class BeaconChain implements IBeaconChain {
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): {state: CachedBeaconStateAllForks; stateId: string; shouldWarn: boolean} {
const state = this.checkpointStateCache.get(checkpoint);
const state = this.regen.getCheckpointStateSync(checkpoint);
if (state) {
return {state, stateId: "checkpoint_state", shouldWarn: false};
}
Expand All @@ -692,7 +686,7 @@ export class BeaconChain implements IBeaconChain {
// Find a state in the same branch of checkpoint at same epoch. Balances should exactly the same
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) === checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
const descendantBlockState = this.regen.getStateSync(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: descendantBlockState, stateId: "descendant_state_same_epoch", shouldWarn: true};
}
Expand All @@ -708,7 +702,7 @@ export class BeaconChain implements IBeaconChain {
// Note: must call .forwardIterateDescendants() again since nodes are not sorted
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) > checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
const descendantBlockState = this.regen.getStateSync(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: blockState, stateId: "descendant_state_latter_epoch", shouldWarn: true};
}
Expand Down Expand Up @@ -847,8 +841,8 @@ export class BeaconChain implements IBeaconChain {
this.seenBlockProposers.prune(computeStartSlotAtEpoch(cp.epoch));

// TODO: Improve using regen here
const headState = this.stateCache.get(this.forkChoice.getHead().stateRoot);
const finalizedState = this.checkpointStateCache.get(cp);
const headState = this.regen.getStateSync(this.forkChoice.getHead().stateRoot);
const finalizedState = this.regen.getCheckpointStateSync(cp);
if (headState) {
this.opPool.pruneAll(headState, finalizedState);
}
Expand Down
3 changes: 0 additions & 3 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {Metrics} from "../metrics/metrics.js";
import {IClock} from "../util/clock.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {StateContextCache, CheckpointStateCache} from "./stateCache/index.js";
import {IBlsVerifier} from "./bls/index.js";
import {
SeenAttesters,
Expand Down Expand Up @@ -80,8 +79,6 @@ export interface IBeaconChain {
readonly forkChoice: IForkChoice;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: IStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class PrepareNextSlotScheduler {
// + if next slot is a skipped slot, it'd help getting target checkpoint state faster to validate attestations
if (isEpochTransition) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "success"}, 1);
const previousHits = this.chain.checkpointStateCache.updatePreComputedCheckpoint(headRoot, nextEpoch);
const previousHits = this.chain.regen.updatePreComputedCheckpoint(headRoot, nextEpoch);
if (previousHits === 0) {
this.metrics?.precomputeNextEpochTransition.waste.inc();
}
Expand Down
21 changes: 19 additions & 2 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import {allForks, phase0, Slot, RootHex} from "@lodestar/types";
import {allForks, phase0, Slot, RootHex, Epoch} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {ProtoBlock} from "@lodestar/fork-choice";
import {CheckpointHex} from "../stateCache/index.js";

export enum RegenCaller {
getDuties = "getDuties",
Expand Down Expand Up @@ -27,10 +30,24 @@ export type StateCloneOpts = {
dontTransferCache: boolean;
};

export interface IStateRegenerator extends IStateRegeneratorInternal {
dropCache(): void;
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
}

/**
* Regenerates states that have already been processed by the fork choice
*/
export interface IStateRegenerator {
export interface IStateRegeneratorInternal {
/**
* Return a valid pre-state for a beacon block
* This will always return a state in the latest viable epoch
Expand Down
Loading

0 comments on commit 8cc2785

Please sign in to comment.