Skip to content

Commit

Permalink
feat: optimistically verify blocks even before all blobs available
Browse files Browse the repository at this point in the history
cleanup pr and add metrics to track

simplify

improvements and type fixes

increase bucket precision

time fixes

improve metrics collection

improve metrics collection

some comments improv

fix the missing writing blobs for blobspromise

rebase fixes

rebase fixes

remove artifact

apply feedback

add more meta info to error

separate out the blockinput cache and attach to chain

rename the cache

apply more feedback

add unittest for seengossipblockinput

add comments
  • Loading branch information
g11tech committed Dec 19, 2023
1 parent d3005bf commit f43984e
Show file tree
Hide file tree
Showing 18 changed files with 635 additions and 232 deletions.
21 changes: 7 additions & 14 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
computeTimeAtSlot,
parseSignedBlindedBlockOrContents,
reconstructFullBlockOrContents,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -121,19 +120,13 @@ export function getBeaconBlockApi({
}

try {
await verifyBlocksInEpoch.call(
chain as BeaconChain,
parentBlock,
[blockForImport],
[DataAvailableStatus.available],
{
...opts,
verifyOnly: true,
skipVerifyBlockSignatures: true,
skipVerifyExecutionPayload: true,
seenTimestampSec,
}
);
await verifyBlocksInEpoch.call(chain as BeaconChain, parentBlock, [blockForImport], {
...opts,
verifyOnly: true,
skipVerifyBlockSignatures: true,
skipVerifyExecutionPayload: true,
seenTimestampSec,
});
} catch (error) {
chain.logger.error("Consensus checks failed while publishing the block", valLogMeta, error as Error);
chain.persistInvalidSszValue(
Expand Down
15 changes: 3 additions & 12 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ export async function processBlocks(
}

try {
const {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock} = verifyBlocksSanityChecks(
this,
blocks,
opts
);
const {relevantBlocks, parentSlots, parentBlock} = verifyBlocksSanityChecks(this, blocks, opts);

// No relevant blocks, skip verifyBlocksInEpoch()
if (relevantBlocks.length === 0 || parentBlock === null) {
Expand All @@ -72,13 +68,8 @@ export async function processBlocks(

// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, proposerBalanceDeltas, segmentExecStatus} = await verifyBlocksInEpoch.call(
this,
parentBlock,
relevantBlocks,
dataAvailabilityStatuses,
opts
);
const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus} =
await verifyBlocksInEpoch.call(this, parentBlock, relevantBlocks, opts);

// If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid
// and we need to further propagate
Expand Down
155 changes: 33 additions & 122 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {allForks, deneb, Slot} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {pruneSetToMax} from "@lodestar/utils";

export enum BlockInputType {
preDeneb = "preDeneb",
postDeneb = "postDeneb",
blobsPromise = "blobsPromise",
}

/** Enum to represent where blocks come from */
Expand All @@ -19,9 +18,18 @@ export enum BlockSource {
byRoot = "req_resp_by_root",
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| {type: BlockInputType.postDeneb; blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise<BlockInputBlobs>}
);

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
Expand All @@ -32,125 +40,7 @@ export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clo
);
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}
type GossipedBlockInput =
| {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null}
| {type: GossipedInputType.blob; signedBlob: deneb.SignedBlobSidecar; blobBytes: Uint8Array | null};
type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobs: Map<number, deneb.BlobSidecar>;
blobsBytes: Map<number, Uint8Array | null>;
};

const MAX_GOSSIPINPUT_CACHE = 5;
// ssz.deneb.BlobSidecars.elementType.fixedSize;
const BLOBSIDECAR_FIXED_SIZE = 131256;

export const getBlockInput = {
blockInputCache: new Map<RootHex, BlockInputCacheType>(),

getGossipBlockInput(
config: ChainForkConfig,
gossipedInput: GossipedBlockInput
):
| {blockInput: BlockInput; blockInputMeta: {pending: null; haveBlobs: number; expectedBlobs: number}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.blob; haveBlobs: number; expectedBlobs: number}} {
let blockHex;
let blockCache;

if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock, blockBytes} = gossipedInput;

blockHex = toHexString(
config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);
blockCache = this.blockInputCache.get(blockHex) ?? {
blobs: new Map<number, deneb.BlobSidecar>(),
blobsBytes: new Map<number, Uint8Array | null>(),
};

blockCache.block = signedBlock;
blockCache.blockBytes = blockBytes;
} else {
const {signedBlob, blobBytes} = gossipedInput;
blockHex = toHexString(signedBlob.message.blockRoot);
blockCache = this.blockInputCache.get(blockHex);

// If a new entry is going to be inserted, prune out old ones
if (blockCache === undefined) {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
blockCache = {blobs: new Map<number, deneb.BlobSidecar>(), blobsBytes: new Map<number, Uint8Array | null>()};
}

// TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions
blockCache.blobs.set(signedBlob.message.index, signedBlob.message);
// easily splice out the unsigned message as blob is a fixed length type
blockCache.blobsBytes.set(signedBlob.message.index, blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null);
}

this.blockInputCache.set(blockHex, blockCache);
const {block: signedBlock, blockBytes} = blockCache;

if (signedBlock !== undefined) {
// block is available, check if all blobs have shown up
const {slot, body} = signedBlock.message;
const {blobKzgCommitments} = body as deneb.BeaconBlockBody;
const blockInfo = `blockHex=${blockHex}, slot=${slot}`;

if (blobKzgCommitments.length < blockCache.blobs.size) {
throw Error(
`Received more blobs=${blockCache.blobs.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}`
);
}
if (blobKzgCommitments.length === blockCache.blobs.size) {
const blobSidecars = [];
const blobsBytes = [];

for (let index = 0; index < blobKzgCommitments.length; index++) {
const blobSidecar = blockCache.blobs.get(index);
if (blobSidecar === undefined) {
throw Error(`Missing blobSidecar at index=${index} for ${blockInfo}`);
}
blobSidecars.push(blobSidecar);
blobsBytes.push(blockCache.blobsBytes.get(index) ?? null);
}

return {
// TODO freetheblobs: collate and add serialized data for the postDeneb blockinput
blockInput: getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobSidecars,
blockBytes ?? null,
blobsBytes
),
blockInputMeta: {pending: null, haveBlobs: blockCache.blobs.size, expectedBlobs: blobKzgCommitments.length},
};
} else {
return {
blockInput: null,
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blockCache.blobs.size,
expectedBlobs: blobKzgCommitments.length,
},
};
}
} else {
// will need to wait for the block to showup
return {
blockInput: null,
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blockCache.blobs.size, expectedBlobs: null},
};
}
},

preDeneb(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
Expand Down Expand Up @@ -188,6 +78,27 @@ export const getBlockInput = {
blobsBytes,
};
},

blobsPromise(
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobsCache: BlobsCache,
blockBytes: Uint8Array | null,
availabilityPromise: Promise<BlockInputBlobs>
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
}
return {
type: BlockInputType.blobsPromise,
block,
source,
blobsCache,
blockBytes,
availabilityPromise,
};
},
};

export enum AttestationImportOpt {
Expand Down
52 changes: 45 additions & 7 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {bellatrix, deneb} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {ProtoBlock, ExecutionStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
Expand All @@ -14,13 +14,14 @@ import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {RegenCaller} from "../regen/index.js";
import type {BeaconChain} from "../chain.js";
import {BlockInput, ImportBlockOpts} from "./types.js";
import {BlockInput, ImportBlockOpts, BlockInputType} from "./types.js";
import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js";
import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js";
import {DENEB_BLOWFISH_BANNER} from "./utils/blowfishBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js";
import {verifyBlocksDataAvailability} from "./verifyBlocksDataAvailability.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
Expand All @@ -38,12 +39,12 @@ export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: BlockInput[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
postStates: CachedBeaconStateAllForks[];
proposerBalanceDeltas: number[];
segmentExecStatus: SegmentExecStatus;
dataAvailabilityStatuses: DataAvailableStatus[];
}> {
const blocks = blocksInput.map(({block}) => block);
if (blocks.length === 0) {
Expand Down Expand Up @@ -88,7 +89,12 @@ export async function verifyBlocksInEpoch(

try {
// batch all I/O operations to reduce overhead
const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([
const [
segmentExecStatus,
{dataAvailabilityStatuses, availableTime},
{postStates, proposerBalanceDeltas, verifyStateTime},
{verifySignaturesTime},
] = await Promise.all([
// Execution payloads
opts.skipVerifyExecutionPayload !== true
? verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts)
Expand All @@ -98,12 +104,16 @@ export async function verifyBlocksInEpoch(
mergeBlockFound: null,
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
verifyBlocksStateTransitionOnly(
preState0,
blocksInput,
dataAvailabilityStatuses,
// hack availability for state transition eval as availability is separately determined
blocks.map(() => DataAvailableStatus.available),
this.logger,
this.metrics,
abortController.signal,
Expand All @@ -113,7 +123,7 @@ export async function verifyBlocksInEpoch(
// All signatures at once
opts.skipVerifyBlockSignatures !== true
? verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts)
: Promise.resolve(),
: Promise.resolve({verifySignaturesTime: Date.now()}),

// ideally we want to only persist blocks after verifying them however the reality is there are
// rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's
Expand Down Expand Up @@ -151,7 +161,35 @@ export async function verifyBlocksInEpoch(
}
}

return {postStates, proposerBalanceDeltas, segmentExecStatus};
if (segmentExecStatus.execAborted === null) {
const {executionStatuses, executionTime} = segmentExecStatus;
if (
blocksInput.length === 1 &&
// gossip blocks have seenTimestampSec
opts.seenTimestampSec !== undefined &&
blocksInput[0].type !== BlockInputType.preDeneb &&
executionStatuses[0] === ExecutionStatus.Valid
) {
// Find the max time when the block was actually verified
const fullyVerifiedTime = Math.max(executionTime, verifyStateTime, verifySignaturesTime);
const recvTofullyVerifedTime = fullyVerifiedTime / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToFullyVerifiedTime.observe(recvTofullyVerifedTime);

const verifiedToBlobsAvailabiltyTime = Math.max(availableTime - fullyVerifiedTime, 0) / 1000;
const numBlobs = (blocksInput[0].block as deneb.SignedBeaconBlock).message.body.blobKzgCommitments.length;

this.metrics?.gossipBlock.verifiedToBlobsAvailabiltyTime.observe({numBlobs}, verifiedToBlobsAvailabiltyTime);
this.logger.verbose("Verified blockInput fully with blobs availability", {
slot: blocksInput[0].block.message.slot,
recvTofullyVerifedTime,
verifiedToBlobsAvailabiltyTime,
type: blocksInput[0].type,
numBlobs,
});
}
}

return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus};
} finally {
abortController.abort();
}
Expand Down
Loading

0 comments on commit f43984e

Please sign in to comment.