Skip to content

Commit

Permalink
Merge 4f96320 into 08dbb21
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion authored Nov 8, 2021
2 parents 08dbb21 + 4f96320 commit d8f525f
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 44 deletions.
11 changes: 10 additions & 1 deletion packages/lodestar/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ import {IMetrics} from "../../metrics";
import {IEth1ForBlockProduction} from "../../eth1";
import {IExecutionEngine} from "../../executionEngine";
import {IBeaconDb} from "../../db";
import {ZERO_HASH_HEX} from "../../constants";
import {CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache";
import {IStateRegenerator} from "../regen";
import {ChainEvent} from "../emitter";
import {ChainEventEmitter} from "../emitter";
import {getCheckpointFromState} from "./utils/checkpoint";
import {PendingEvents} from "./utils/pendingEvents";
import {FullyVerifiedBlock} from "./types";
import {ZERO_HASH_HEX} from "../../constants";

export type ImportBlockModules = {
db: IBeaconDb;
eth1: IEth1ForBlockProduction;
forkChoice: IForkChoice;
regen: IStateRegenerator;
stateCache: StateContextCache;
checkpointStateCache: CheckpointStateCache;
executionEngine: IExecutionEngine;
Expand Down Expand Up @@ -149,6 +151,13 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
pendingEvents.push(ChainEvent.forkChoiceReorg, newHead, oldHead, distance);
chain.metrics?.forkChoiceReorg.inc();
}

// MUST BE CALLED IF HEAD CHANGES !!! Otherwise the node will use the wrong state as head.
// Currently the cannonical head information is split between `forkChoice.getHead()` to get just a summary, and
// regen.getHeadState() to get the state of that head.
//
// Set head state in regen. May trigger async regen if the state is not in a memory cache
chain.regen.setHead(newHead, postState);
}

// NOTE: forkChoice.fsStore.finalizedCheckpoint MUST only change is response to an onBlock event
Expand Down
11 changes: 7 additions & 4 deletions packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ export class BeaconChain implements IBeaconChain {
metrics,
signal,
});

// On start, the initial anchor state is added to the state cache + the forkchoice.
// Since this state and its block is the only one in the forkchoice, it becomes the head.
regen.setHead(forkChoice.getHead(), cachedState);

this.blockProcessor = new BlockProcessor(
{
clock,
Expand Down Expand Up @@ -194,10 +199,8 @@ export class BeaconChain implements IBeaconChain {

getHeadState(): CachedBeaconState<allForks.BeaconState> {
// head state should always exist
const head = this.forkChoice.getHead();
const headState =
this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
if (!headState) throw Error("headState does not exist");
const headState = this.regen.getHeadState();
if (!headState) throw Error("headState not available");
return headState;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type RegenErrorType =
| {code: RegenErrorCode.BLOCK_NOT_IN_FORKCHOICE; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_NOT_IN_FORKCHOICE; stateRoot: RootHex | Root}
| {code: RegenErrorCode.SLOT_BEFORE_BLOCK_SLOT; slot: Slot; blockSlot: Slot}
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.NO_SEED_STATE; slot: Slot; blockRoot: RootHex}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
Expand Down
9 changes: 8 additions & 1 deletion packages/lodestar/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {allForks, phase0, Slot, RootHex} from "@chainsafe/lodestar-types";
import {CachedBeaconState} from "@chainsafe/lodestar-beacon-state-transition";
import {IProtoBlock} from "@chainsafe/lodestar-fork-choice";

export enum RegenCaller {
getDuties = "getDuties",
Expand All @@ -11,6 +12,7 @@ export enum RegenCaller {
validateGossipAggregateAndProof = "validateGossipAggregateAndProof",
validateGossipAttestation = "validateGossipAttestation",
onForkChoiceFinalized = "onForkChoiceFinalized",
regenHeadState = "regenHeadState",
}

export enum RegenFnName {
Expand All @@ -20,10 +22,15 @@ export enum RegenFnName {
getCheckpointState = "getCheckpointState",
}

export interface IStateRegenerator extends IStateRegeneratorInternal {
getHeadState(): CachedBeaconState<allForks.BeaconState> | null;
setHead(head: IProtoBlock, potentialHeadState?: CachedBeaconState<allForks.BeaconState>): void;
}

/**
* Regenerates states that have already been processed by the fork choice
*/
export interface IStateRegenerator {
export interface IStateRegeneratorInternal {
/**
* Return a valid pre-state for a beacon block
* This will always return a state in the latest viable epoch
Expand Down
50 changes: 46 additions & 4 deletions packages/lodestar/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {AbortSignal} from "@chainsafe/abort-controller";
import {phase0, Slot, allForks, RootHex} from "@chainsafe/lodestar-types";
import {IForkChoice} from "@chainsafe/lodestar-fork-choice";
import {IForkChoice, IProtoBlock} from "@chainsafe/lodestar-fork-choice";
import {CachedBeaconState, computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache";
import {IMetrics} from "../../metrics";
import {JobItemQueue} from "../../util/queue";
import {IStateRegenerator, RegenCaller, RegenFnName} from "./interface";
import {IStateRegeneratorInternal, IStateRegenerator, RegenCaller, RegenFnName} from "./interface";
import {StateRegenerator, RegenModules} from "./regen";
import {RegenError, RegenErrorCode} from "./errors";
import {toHexString} from "@chainsafe/ssz";
Expand All @@ -16,8 +16,8 @@ type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
};

type RegenRequestKey = keyof IStateRegenerator;
type RegenRequestByKey = {[K in RegenRequestKey]: {key: K; args: Parameters<IStateRegenerator[K]>}};
type RegenRequestKey = keyof IStateRegeneratorInternal;
type RegenRequestByKey = {[K in RegenRequestKey]: {key: K; args: Parameters<IStateRegeneratorInternal[K]>}};
export type RegenRequest = RegenRequestByKey[RegenRequestKey];

/**
Expand All @@ -34,6 +34,9 @@ export class QueuedStateRegenerator implements IStateRegenerator {
private checkpointStateCache: CheckpointStateCache;
private metrics: IMetrics | null;

private headStateRootHex: string | null = null;
private headState: CachedBeaconState<allForks.BeaconState> | null = null;

constructor(modules: QueuedStateRegeneratorModules) {
this.regen = new StateRegenerator(modules);
this.jobQueue = new JobItemQueue<[RegenRequest], CachedBeaconState<allForks.BeaconState>>(
Expand All @@ -47,6 +50,45 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics = modules.metrics;
}

getHeadState(): CachedBeaconState<allForks.BeaconState> | null {
return (
this.headState ||
// Fallback, check if head state is in cache
(this.headStateRootHex ? this.stateCache.get(this.headStateRootHex) : null)
);
}

setHead(head: IProtoBlock, potentialHeadState?: CachedBeaconState<allForks.BeaconState>): void {
this.headStateRootHex = head.stateRoot;

const headState =
potentialHeadState &&
// Compare the slot to prevent comparing stateRoot which should be more expensive
head.slot === potentialHeadState.slot &&
head.stateRoot === toHexString(potentialHeadState.hashTreeRoot())
? potentialHeadState
: this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);

// State is available syncronously =D
// Note: almost always the headState should be in the cache since it should be from a block recently processed
if (headState) {
this.headState = headState;
return;
}

// Make the state temporarily unavailable, while regen gets the state. While headState = null, the node may halt,
// but it will recover eventually once the headState is available.
this.headState = null;
this.getState(head.stateRoot, RegenCaller.regenHeadState)
.then((state) => {
this.headState = state;
})
.catch((e) => {
(e as Error).message = `Head state ${head.slot} ${head.stateRoot} not available: ${(e as Error).message}`;
throw e;
});
}

/**
* Get the state to run with `block`.
* - State after `block.parentRoot` dialed forward to block.slot
Expand Down
6 changes: 4 additions & 2 deletions packages/lodestar/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {IMetrics} from "../../metrics";
import {IBeaconDb} from "../../db";
import {CheckpointStateCache, StateContextCache} from "../stateCache";
import {IStateRegenerator, RegenCaller} from "./interface";
import {IStateRegeneratorInternal, RegenCaller} from "./interface";
import {RegenError, RegenErrorCode} from "./errors";
import {getCheckpointFromState} from "../blocks/utils/checkpoint";

Expand All @@ -29,7 +29,7 @@ export type RegenModules = {
/**
* Regenerates states that have already been processed by the fork choice
*/
export class StateRegenerator implements IStateRegenerator {
export class StateRegenerator implements IStateRegeneratorInternal {
constructor(private readonly modules: RegenModules) {}

/**
Expand Down Expand Up @@ -153,6 +153,8 @@ export class StateRegenerator implements IStateRegenerator {
if (state === null) {
throw new RegenError({
code: RegenErrorCode.NO_SEED_STATE,
slot: block.slot,
blockRoot: block.blockRoot,
});
}

Expand Down
51 changes: 43 additions & 8 deletions packages/lodestar/src/chain/stateCache/mapMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ type MapTrackerMetrics = {
secondsSinceLastRead: IAvgMinMax;
};

export class MapTracker<K, V> extends Map<K, V> {
// eslint-disable-next-line @typescript-eslint/ban-types
export class MapTrackerWeakRef<K, V extends object> {
readonly values = new Map<K, WeakRef<V>>();
/** Tracks the number of reads each entry in the cache gets for metrics */
readonly readCount = new Map<K, number>();
/** Tracks the last time a state was read from the cache */
readonly lastRead = new Map<K, number>();

constructor(metrics?: MapTrackerMetrics) {
super();
if (metrics) {
metrics.reads.addGetValuesFn(() => Array.from(this.readCount.values()));
metrics.secondsSinceLastRead.addGetValuesFn(() => {
Expand All @@ -26,26 +27,60 @@ export class MapTracker<K, V> extends Map<K, V> {
}
}

get size(): number {
return this.values.size;
}

get(key: K): V | undefined {
const value = super.get(key);
if (value !== undefined) {
this.readCount.set(key, 1 + (this.readCount.get(key) ?? 0));
this.lastRead.set(key, Date.now());
const valueWeakRef = this.values.get(key);
if (valueWeakRef === undefined) {
return undefined;
}

const value = valueWeakRef.deref();
// Clean GC'ed references
if (value === undefined) {
this.delete(key);
return undefined;
}

this.readCount.set(key, 1 + (this.readCount.get(key) ?? 0));
this.lastRead.set(key, Date.now());
return value;
}

set(key: K, value: V): void {
this.values.set(key, new WeakRef(value));
}

delete(key: K): boolean {
const deleted = super.delete(key);
const deleted = this.values.delete(key);
if (deleted) {
this.readCount.delete(key);
this.lastRead.delete(key);
}
return deleted;
}

has(key: K): boolean {
return this.values.has(key);
}

keys(): IterableIterator<K> {
return this.values.keys();
}

*entries(): IterableIterator<[K, V]> {
for (const [key, weakRef] of this.values.entries()) {
const value = weakRef.deref();
if (value) {
yield [key, value];
}
}
}

clear(): void {
super.clear();
this.values.clear();
this.readCount.clear();
this.lastRead.clear();
}
Expand Down
6 changes: 3 additions & 3 deletions packages/lodestar/src/chain/stateCache/stateContextCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {Epoch, allForks, RootHex} from "@chainsafe/lodestar-types";
import {CachedBeaconState} from "@chainsafe/lodestar-beacon-state-transition";
import {routes} from "@chainsafe/lodestar-api";
import {IMetrics} from "../../metrics";
import {MapTracker} from "./mapMetrics";
import {MapTrackerWeakRef} from "./mapMetrics";

const MAX_STATES = 3 * 32;

Expand All @@ -18,14 +18,14 @@ export class StateContextCache {
*/
readonly maxStates: number;

private readonly cache: MapTracker<string, CachedBeaconState<allForks.BeaconState>>;
private readonly cache: MapTrackerWeakRef<string, CachedBeaconState<allForks.BeaconState>>;
/** Epoch -> Set<blockRoot> */
private readonly epochIndex = new Map<Epoch, Set<string>>();
private readonly metrics: IMetrics["stateCache"] | null | undefined;

constructor({maxStates = MAX_STATES, metrics}: {maxStates?: number; metrics?: IMetrics | null}) {
this.maxStates = maxStates;
this.cache = new MapTracker(metrics?.stateCache);
this.cache = new MapTrackerWeakRef(metrics?.stateCache);
if (metrics) {
this.metrics = metrics.stateCache;
metrics.stateCache.size.addCollect(() => metrics.stateCache.size.set(this.cache.size));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {phase0, Epoch, allForks, RootHex} from "@chainsafe/lodestar-types";
import {CachedBeaconState} from "@chainsafe/lodestar-beacon-state-transition";
import {routes} from "@chainsafe/lodestar-api";
import {IMetrics} from "../../metrics";
import {MapTracker} from "./mapMetrics";
import {MapTrackerWeakRef} from "./mapMetrics";
import {MapDef} from "../../util/map";

type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
Expand All @@ -16,15 +16,15 @@ const MAX_EPOCHS = 10;
* Similar API to Repository
*/
export class CheckpointStateCache {
private readonly cache: MapTracker<string, CachedBeaconState<allForks.BeaconState>>;
private readonly cache: MapTrackerWeakRef<string, CachedBeaconState<allForks.BeaconState>>;
/** Epoch -> Set<blockRoot> */
private readonly epochIndex = new MapDef<Epoch, Set<string>>(() => new Set<string>());
private readonly metrics: IMetrics["cpStateCache"] | null | undefined;
private preComputedCheckpoint: string | null = null;
private preComputedCheckpointHits: number | null = null;

constructor({metrics}: {metrics?: IMetrics | null}) {
this.cache = new MapTracker(metrics?.cpStateCache);
this.cache = new MapTrackerWeakRef(metrics?.cpStateCache);
if (metrics) {
this.metrics = metrics.cpStateCache;
metrics.cpStateCache.size.addCollect(() => metrics.cpStateCache.size.set(this.cache.size));
Expand Down
Loading

0 comments on commit d8f525f

Please sign in to comment.