Skip to content

Commit

Permalink
fix: avoid leaky event handler in waitForCheckpointState (#6096)
Browse files Browse the repository at this point in the history
* fix: do not consume checkpoint state in waitForCheckpointState

* chore: add more comments
  • Loading branch information
twoeths committed Nov 9, 2023
1 parent 55817cd commit 37cf9dd
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
BLSSignature,
isBlindedBeaconBlock,
isBlindedBlockContents,
phase0,
} from "@lodestar/types";
import {ExecutionStatus} from "@lodestar/fork-choice";
import {toHex, racePromisesWithCutoff, RaceEvent} from "@lodestar/utils";
Expand Down Expand Up @@ -172,12 +173,17 @@ export function getValidatorApi({

/**
* This function is called 1s before next epoch, usually at that time PrepareNextSlotScheduler finishes
* so we should have checkpoint state, otherwise wait for up to `timeoutMs`.
* so we should have checkpoint state, otherwise wait for up to the slot 1 of epoch.
* slot epoch 0 1
* |------------|------------|
* ^ ^
* | |
* | |
* | waitForCheckpointState (1s before slot 0 of epoch, wait until slot 1 of epoch)
* |
* prepareNextSlot (4s before next slot)
*/
async function waitForCheckpointState(
cpHex: CheckpointHex,
timeoutMs: number
): Promise<CachedBeaconStateAllForks | null> {
async function waitForCheckpointState(cpHex: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const cpState = chain.regen.getCheckpointStateSync(cpHex);
if (cpState) {
return cpState;
Expand All @@ -186,16 +192,30 @@ export function getValidatorApi({
epoch: cpHex.epoch,
root: fromHexString(cpHex.rootHex),
};
// if not, wait for ChainEvent.checkpoint event until timeoutMs
return new Promise<CachedBeaconStateAllForks | null>((resolve) => {
const timer = setTimeout(() => resolve(null), timeoutMs);
chain.emitter.on(ChainEvent.checkpoint, (eventCp, cpState) => {
if (ssz.phase0.Checkpoint.equals(eventCp, cp)) {
clearTimeout(timer);
resolve(cpState);
}
});
});
const slot0 = computeStartSlotAtEpoch(cp.epoch);
// if not, wait for ChainEvent.checkpoint event until slot 1 of epoch
let listener: ((eventCp: phase0.Checkpoint) => void) | null = null;
const foundCPState = await Promise.race([
new Promise((resolve) => {
listener = (eventCp) => {
resolve(ssz.phase0.Checkpoint.equals(eventCp, cp));
};
chain.emitter.once(ChainEvent.checkpoint, listener);
}),
// in rare case, checkpoint state cache may happen up to 6s of slot 0 of epoch
// so we wait for it until the slot 1 of epoch
chain.clock.waitForSlot(slot0 + 1),
]);

if (listener != null) {
chain.emitter.off(ChainEvent.checkpoint, listener);
}

if (foundCPState === true) {
return chain.regen.getCheckpointStateSync(cpHex);
}

return null;
}

/**
Expand Down Expand Up @@ -721,7 +741,7 @@ export function getValidatorApi({
// this is to avoid missed block proposal due to 0 epoch look ahead
if (epoch === nextEpoch && toNextEpochMs < prepareNextSlotLookAheadMs) {
// wait for maximum 1 slot for cp state which is the timeout of validator api
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch}, slotMs);
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch});
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
Expand Down

0 comments on commit 37cf9dd

Please sign in to comment.