Skip to content

Commit

Permalink
Concurrent block verification steps
Browse files Browse the repository at this point in the history
Basic range sync perf test

Re-use existing CachedBeaconState from anchorState

Verify block segment in parallel

Fix perf test range

Default to pool of size 4

Yield on  getBlockSignatureSets

Fix test type

Ensure db is clean-up after each tmpdb usage

Add INFURA_ETH2_CREDENTIALS to benchmark GA
  • Loading branch information
dapplion committed Jun 4, 2022
1 parent b5e24f7 commit 332bfc0
Show file tree
Hide file tree
Showing 23 changed files with 531 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,13 @@ export * from "./voluntaryExits.js";
* Includes all signatures on the block (except the deposit signatures) for verification.
* Deposits are not included because they can legally have invalid signatures.
*/
export function getAllBlockSignatureSets(
export function getBlockSignatureSets(
state: CachedBeaconStateAllForks,
signedBlock: allForks.SignedBeaconBlock
): ISignatureSet[] {
return [getProposerSignatureSet(state, signedBlock), ...getAllBlockSignatureSetsExceptProposer(state, signedBlock)];
}

/**
* Includes all signatures on the block (except the deposit signatures) for verification.
* Useful since block proposer signature is verified beforehand on gossip validation
*/
export function getAllBlockSignatureSetsExceptProposer(
state: CachedBeaconStateAllForks,
signedBlock: allForks.SignedBeaconBlock
signedBlock: allForks.SignedBeaconBlock,
opts?: {
/** Useful since block proposer signature is verified beforehand on gossip validation */
skipProposerSignature?: boolean;
}
): ISignatureSet[] {
const signatureSets = [
getRandaoRevealSignatureSet(state, signedBlock.message),
Expand All @@ -43,6 +36,10 @@ export function getAllBlockSignatureSetsExceptProposer(
...getVoluntaryExitsSignatureSets(state, signedBlock),
];

if (!opts?.skipProposerSignature) {
signatureSets.push(getProposerSignatureSet(state, signedBlock));
}

// Only after altair fork, validate tSyncCommitteeSignature
if (computeEpochAtSlot(signedBlock.message.slot) >= state.config.ALTAIR_FORK_EPOCH) {
const syncCommitteeSignatureSet = getSyncCommitteeSignatureSet(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import {altair, ssz} from "@chainsafe/lodestar-types";
import {DOMAIN_SYNC_COMMITTEE} from "@chainsafe/lodestar-params";
import {byteArrayEquals} from "@chainsafe/ssz";

import {
computeSigningRoot,
getBlockRootAtSlot,
ISignatureSet,
SignatureSetType,
verifySignatureSet,
} from "../../util/index.js";
import {computeSigningRoot, ISignatureSet, SignatureSetType, verifySignatureSet} from "../../util/index.js";
import {CachedBeaconStateAllForks} from "../../types.js";
import {G2_POINT_AT_INFINITY} from "../../constants/index.js";
import {getUnparticipantValues} from "../../util/array.js";
Expand Down Expand Up @@ -66,13 +59,18 @@ export function getSyncCommitteeSignatureSet(
// ```
// However we need to run the function getSyncCommitteeSignatureSet() for all the blocks in a epoch
// with the same state when verifying blocks in batch on RangeSync. Therefore we use the block.slot.
//
// This function expects that block.slot <= state.slot, otherwise we can't get the root sign by the sync committee.
// process_sync_committee() is run at the end of process_block(). process_block() is run after process_slots()
// which in the spec forces state.slot to equal block.slot.
const previousSlot = Math.max(block.slot, 1) - 1;

const rootSigned = getBlockRootAtSlot(state, previousSlot);
// The spec uses the state to get the root at previousSlot
// ```python
// get_block_root_at_slot(state, previous_slot)
// ```
// However we need to run the function getSyncCommitteeSignatureSet() for all the blocks in a epoch
// with the same state when verifying blocks in batch on RangeSync.
//
// On skipped slots state block roots just copy the latest block, so using the parentRoot here is equivalent.
// So getSyncCommitteeSignatureSet() can be called with a state in any slot (with the correct shuffling)
const rootSigned = block.parentRoot;

if (!participantIndices) {
const committeeIndices = state.epochCtx.currentSyncCommitteeIndexed.validatorIndices;
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-state-transition/src/cache/stateCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,12 @@ export function getCachedBeaconState<T extends BeaconStateAllForks>(

return cachedState;
}

/**
* Typeguard to check if a state contains a BeaconStateCache
*/
export function isCachedBeaconState<T extends BeaconStateAllForks>(
state: T | (T & BeaconStateCache)
): state is T & BeaconStateCache {
return (state as T & BeaconStateCache).epochCtx !== undefined;
}
2 changes: 1 addition & 1 deletion packages/beacon-state-transition/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export {
} from "./types.js";

// Main state caches
export {createCachedBeaconState, BeaconStateCache} from "./cache/stateCache.js";
export {createCachedBeaconState, BeaconStateCache, isCachedBeaconState} from "./cache/stateCache.js";
export {EpochContext, EpochContextImmutableData, createEmptyEpochContextImmutableData} from "./cache/epochContext.js";
export {EpochProcess, beforeProcessEpoch} from "./cache/epochProcess.js";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ describe("signatureSets", () => {

const state = generateCachedState(config, {validators});

const signatureSets = allForks.getAllBlockSignatureSets(state, signedBlock);
const signatureSets = allForks.getBlockSignatureSets(state, signedBlock);
expect(signatureSets.length).to.equal(
// block signature
1 +
Expand Down
4 changes: 2 additions & 2 deletions packages/lodestar/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export type ImportBlockModules = {
* - Send events after everything is done
*/
export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock: FullyVerifiedBlock): Promise<void> {
const {block, postState, parentBlock, skipImportingAttestations, executionStatus} = fullyVerifiedBlock;
const {block, postState, parentBlockSlot, skipImportingAttestations, executionStatus} = fullyVerifiedBlock;
const pendingEvents = new PendingEvents(chain.emitter);

// - Observe attestations
Expand Down Expand Up @@ -210,7 +210,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
chain.lightClientServer.onImportBlockHead(
block.message as altair.BeaconBlock,
postState as CachedBeaconStateAltair,
parentBlock
parentBlockSlot
);
} catch (e) {
chain.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
Expand Down
84 changes: 32 additions & 52 deletions packages/lodestar/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment */
import {allForks} from "@chainsafe/lodestar-types";
import {sleep} from "@chainsafe/lodestar-utils";
import {ChainEvent} from "../emitter.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockError, BlockErrorCode, ChainSegmentError} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {verifyBlock, VerifyBlockModules} from "./verifyBlock.js";
import {verifyBlocks, VerifyBlockModules} from "./verifyBlock.js";
import {importBlock, ImportBlockModules} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {PartiallyVerifiedBlock} from "./types.js";
Expand Down Expand Up @@ -44,6 +43,10 @@ export class BlockProcessor {
}
}

///////////////////////////
// TODO: Run this functions with spec tests of many blocks
///////////////////////////

/**
* Validate and process a block
*
Expand All @@ -59,26 +62,7 @@ export async function processBlock(
partiallyVerifiedBlock: PartiallyVerifiedBlock,
opts: BlockProcessOpts
): Promise<void> {
try {
const fullyVerifiedBlock = await verifyBlock(modules, partiallyVerifiedBlock, opts);
await importBlock(modules, fullyVerifiedBlock);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
// Flag ignoreIfKnown causes BlockErrorCodes ALREADY_KNOWN, GENESIS_BLOCK to resolve.
// Return before emitting to not cause loud logging.
return;
}

modules.emitter.emit(ChainEvent.errorBlock, err);

throw err;
}
await processChainSegment(modules, [partiallyVerifiedBlock], opts);
}

/**
Expand All @@ -89,41 +73,37 @@ export async function processChainSegment(
partiallyVerifiedBlocks: PartiallyVerifiedBlock[],
opts: BlockProcessOpts
): Promise<void> {
const blocks = partiallyVerifiedBlocks.map((b) => b.block);
assertLinearChainSegment(modules.config, blocks);

let importedBlocks = 0;
if (partiallyVerifiedBlocks.length === 0) {
return; // TODO: or throw?
} else if (partiallyVerifiedBlocks.length > 1) {
assertLinearChainSegment(
modules.config,
partiallyVerifiedBlocks.map((b) => b.block)
);
}

for (const partiallyVerifiedBlock of partiallyVerifiedBlocks) {
try {
// TODO: Re-use preState
const fullyVerifiedBlock = await verifyBlock(modules, partiallyVerifiedBlock, opts);
await importBlock(modules, fullyVerifiedBlock);
importedBlocks++;
// TODO: Does this makes sense with current batch verify approach?
// No block is imported until all blocks are verified
const importedBlocks = 0;

// this avoids keeping our node busy processing blocks
await sleep(0);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);
try {
const fullyVerifiedBlocks = await verifyBlocks(modules, partiallyVerifiedBlocks, opts);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
continue;
}
if (partiallyVerifiedBlock.ignoreIfFinalized && err.type.code == BlockErrorCode.WOULD_REVERT_FINALIZED_SLOT) {
continue;
}
for (const fullyVerifiedBlock of fullyVerifiedBlocks) {
// No need to sleep(0) here since `importBlock` includes a disk write
// TODO: Consider batching importBlock too if it takes significant time
await importBlock(modules, fullyVerifiedBlock);
}
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlocks[0].block);

modules.emitter.emit(ChainEvent.errorBlock, err);
modules.emitter.emit(ChainEvent.errorBlock, err);

// Convert to ChainSegmentError to append `importedBlocks` data
const chainSegmentError = new ChainSegmentError(partiallyVerifiedBlock.block, err.type, importedBlocks);
chainSegmentError.stack = err.stack;
throw chainSegmentError;
}
// Convert to ChainSegmentError to append `importedBlocks` data
const chainSegmentError = new ChainSegmentError(partiallyVerifiedBlocks[0].block, err.type, importedBlocks);
chainSegmentError.stack = err.stack;
throw chainSegmentError;
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/lodestar/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks} from "@chainsafe/lodestar-beacon-state-transition";
import {IProtoBlock, ExecutionStatus} from "@chainsafe/lodestar-fork-choice";
import {allForks} from "@chainsafe/lodestar-types";
import {ExecutionStatus} from "@chainsafe/lodestar-fork-choice";
import {allForks, Slot} from "@chainsafe/lodestar-types";

export type FullyVerifiedBlockFlags = {
/**
Expand Down Expand Up @@ -50,7 +50,7 @@ export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
export type FullyVerifiedBlock = FullyVerifiedBlockFlags & {
block: allForks.SignedBeaconBlock;
postState: CachedBeaconStateAllForks;
parentBlock: IProtoBlock;
parentBlockSlot: Slot;
};

/**
Expand Down
Loading

0 comments on commit 332bfc0

Please sign in to comment.