Skip to content

Commit

Permalink
Merge 3c02268 into 6dd408b
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion authored Jun 10, 2022
2 parents 6dd408b + 3c02268 commit ea38bf3
Show file tree
Hide file tree
Showing 32 changed files with 730 additions and 533 deletions.
24 changes: 11 additions & 13 deletions packages/beacon-state-transition/src/block/processSyncCommittee.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import {altair, ssz} from "@chainsafe/lodestar-types";
import {DOMAIN_SYNC_COMMITTEE} from "@chainsafe/lodestar-params";
import {byteArrayEquals} from "@chainsafe/ssz";

import {
computeSigningRoot,
getBlockRootAtSlot,
ISignatureSet,
SignatureSetType,
verifySignatureSet,
} from "../util/index.js";
import {computeSigningRoot, ISignatureSet, SignatureSetType, verifySignatureSet} from "../util/index.js";
import {CachedBeaconStateAllForks} from "../types.js";
import {G2_POINT_AT_INFINITY} from "../constants/index.js";
import {getUnparticipantValues} from "../util/array.js";
Expand Down Expand Up @@ -66,13 +59,18 @@ export function getSyncCommitteeSignatureSet(
// ```
// However we need to run the function getSyncCommitteeSignatureSet() for all the blocks in a epoch
// with the same state when verifying blocks in batch on RangeSync. Therefore we use the block.slot.
//
// This function expects that block.slot <= state.slot, otherwise we can't get the root sign by the sync committee.
// process_sync_committee() is run at the end of process_block(). process_block() is run after process_slots()
// which in the spec forces state.slot to equal block.slot.
const previousSlot = Math.max(block.slot, 1) - 1;

const rootSigned = getBlockRootAtSlot(state, previousSlot);
// The spec uses the state to get the root at previousSlot
// ```python
// get_block_root_at_slot(state, previous_slot)
// ```
// However we need to run the function getSyncCommitteeSignatureSet() for all the blocks in a epoch
// with the same state when verifying blocks in batch on RangeSync.
//
// On skipped slots state block roots just copy the latest block, so using the parentRoot here is equivalent.
// So getSyncCommitteeSignatureSet() can be called with a state in any slot (with the correct shuffling)
const rootSigned = block.parentRoot;

if (!participantIndices) {
const committeeIndices = state.epochCtx.currentSyncCommitteeIndexed.validatorIndices;
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-state-transition/src/cache/stateCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,12 @@ export function getCachedBeaconState<T extends BeaconStateAllForks>(

return cachedState;
}

/**
* Typeguard to check if a state contains a BeaconStateCache
*/
export function isCachedBeaconState<T extends BeaconStateAllForks>(
state: T | (T & BeaconStateCache)
): state is T & BeaconStateCache {
return (state as T & BeaconStateCache).epochCtx !== undefined;
}
2 changes: 1 addition & 1 deletion packages/beacon-state-transition/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export {
} from "./types.js";

// Main state caches
export {createCachedBeaconState, BeaconStateCache} from "./cache/stateCache.js";
export {createCachedBeaconState, BeaconStateCache, isCachedBeaconState} from "./cache/stateCache.js";
export {EpochContext, EpochContextImmutableData, createEmptyEpochContextImmutableData} from "./cache/epochContext.js";
export {EpochProcess, beforeProcessEpoch} from "./cache/epochProcess.js";

Expand Down
23 changes: 10 additions & 13 deletions packages/beacon-state-transition/src/signatureSets/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,13 @@ export * from "./voluntaryExits.js";
* Includes all signatures on the block (except the deposit signatures) for verification.
* Deposits are not included because they can legally have invalid signatures.
*/
export function getAllBlockSignatureSets(
export function getBlockSignatureSets(
state: CachedBeaconStateAllForks,
signedBlock: allForks.SignedBeaconBlock
): ISignatureSet[] {
return [getProposerSignatureSet(state, signedBlock), ...getAllBlockSignatureSetsExceptProposer(state, signedBlock)];
}

/**
* Includes all signatures on the block (except the deposit signatures) for verification.
* Useful since block proposer signature is verified beforehand on gossip validation
*/
export function getAllBlockSignatureSetsExceptProposer(
state: CachedBeaconStateAllForks,
signedBlock: allForks.SignedBeaconBlock
signedBlock: allForks.SignedBeaconBlock,
opts?: {
/** Useful since block proposer signature is verified beforehand on gossip validation */
skipProposerSignature?: boolean;
}
): ISignatureSet[] {
const signatureSets = [
getRandaoRevealSignatureSet(state, signedBlock.message),
Expand All @@ -43,6 +36,10 @@ export function getAllBlockSignatureSetsExceptProposer(
...getVoluntaryExitsSignatureSets(state, signedBlock),
];

if (!opts?.skipProposerSignature) {
signatureSets.push(getProposerSignatureSet(state, signedBlock));
}

// Only after altair fork, validate tSyncCommitteeSignature
if (computeEpochAtSlot(signedBlock.message.slot) >= state.config.ALTAIR_FORK_EPOCH) {
const syncCommitteeSignatureSet = getSyncCommitteeSignatureSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {phase0, ValidatorIndex, BLSSignature} from "@chainsafe/lodestar-types";
import {FAR_FUTURE_EPOCH, MAX_EFFECTIVE_BALANCE} from "@chainsafe/lodestar-params";
import {BitArray} from "@chainsafe/ssz";
import {ZERO_HASH} from "../../../src/constants/index.js";
import {getAllBlockSignatureSets} from "../../../src/signatureSets/index.js";
import {getBlockSignatureSets} from "../../../src/signatureSets/index.js";
import {generateCachedState} from "../../utils/state.js";
import {generateValidators} from "../../utils/validator.js";

Expand Down Expand Up @@ -63,7 +63,7 @@ describe("signatureSets", () => {

const state = generateCachedState(config, {validators});

const signatureSets = getAllBlockSignatureSets(state, signedBlock);
const signatureSets = getBlockSignatureSets(state, signedBlock);
expect(signatureSets.length).to.equal(
// block signature
1 +
Expand Down
11 changes: 5 additions & 6 deletions packages/lodestar/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,13 @@ export function getLodestarApi({

async getBlockProcessorQueueItems() {
return (chain as BeaconChain)["blockProcessor"].jobQueue.getItems().map((item) => {
const [job] = item.args;
const jobs = Array.isArray(job) ? job : [job];
const [blocks, opts] = item.args;
return {
blockSlots: jobs.map((j) => j.block.message.slot),
blockSlots: blocks.map((block) => block.message.slot),
jobOpts: {
skipImportingAttestations: jobs[0].skipImportingAttestations,
validProposerSignature: jobs[0].validProposerSignature,
validSignatures: jobs[0].validSignatures,
skipImportingAttestations: opts.skipImportingAttestations,
validProposerSignature: opts.validProposerSignature,
validSignatures: opts.validSignatures,
},
addedTimeMs: item.addedTimeMs,
};
Expand Down
16 changes: 10 additions & 6 deletions packages/lodestar/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {prepareExecutionPayload} from "../factory/block/body.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
import {FullyVerifiedBlock} from "./types.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {PendingEvents} from "./utils/pendingEvents.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";

Expand Down Expand Up @@ -72,8 +72,12 @@ export type ImportBlockModules = {
* - head_tracker.register_block(block_root, parent_root, slot)
* - Send events after everything is done
*/
export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock: FullyVerifiedBlock): Promise<void> {
const {block, postState, parentBlock, skipImportingAttestations, executionStatus} = fullyVerifiedBlock;
export async function importBlock(
chain: ImportBlockModules,
fullyVerifiedBlock: FullyVerifiedBlock,
opts: ImportBlockOpts
): Promise<void> {
const {block, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const pendingEvents = new PendingEvents(chain.emitter);

// - Observe attestations
Expand Down Expand Up @@ -119,7 +123,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
// Only process attestations of blocks with relevant attestations for the fork-choice:
// If current epoch is N, and block is epoch X, block may include attestations for epoch X or X - 1.
// The latest block that is useful is at epoch N - 1 which may include attestations for epoch N - 1 or N - 2.
if (!skipImportingAttestations && blockEpoch >= currentEpoch - FORK_CHOICE_ATT_EPOCH_LIMIT) {
if (!opts.skipImportingAttestations && blockEpoch >= currentEpoch - FORK_CHOICE_ATT_EPOCH_LIMIT) {
const attestations = block.message.body.attestations;
const rootCache = new RootCache(postState);
const parentSlot = chain.forkChoice.getBlock(block.message.parentRoot)?.slot;
Expand Down Expand Up @@ -211,7 +215,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
chain.lightClientServer.onImportBlockHead(
block.message as altair.BeaconBlock,
postState as CachedBeaconStateAltair,
parentBlock
parentBlockSlot
);
} catch (e) {
chain.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
Expand Down Expand Up @@ -265,7 +269,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
pendingEvents.emit();

// Register stat metrics about the block after importing it
chain.metrics?.parentBlockDistance.observe(block.message.slot - parentBlock.slot);
chain.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
}

async function maybeIssueNextProposerEngineFcU(
Expand Down
114 changes: 32 additions & 82 deletions packages/lodestar/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment */
import {allForks} from "@chainsafe/lodestar-types";
import {sleep} from "@chainsafe/lodestar-utils";
import {ChainEvent} from "../emitter.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockError, BlockErrorCode, ChainSegmentError} from "../errors/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {verifyBlock, VerifyBlockModules} from "./verifyBlock.js";
import {verifyBlocks, VerifyBlockModules} from "./verifyBlock.js";
import {importBlock, ImportBlockModules} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {PartiallyVerifiedBlock} from "./types.js";
export {PartiallyVerifiedBlockFlags} from "./types.js";
import {ImportBlockOpts} from "./types.js";
export {ImportBlockOpts};

const QUEUE_MAX_LENGHT = 256;

Expand All @@ -19,31 +18,27 @@ export type ProcessBlockModules = VerifyBlockModules & ImportBlockModules;
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[PartiallyVerifiedBlock[] | PartiallyVerifiedBlock], void>;
readonly jobQueue: JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>;

constructor(modules: ProcessBlockModules, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue(
(job) => {
if (!Array.isArray(job)) {
return processBlock(modules, job, opts);
} else {
return processChainSegment(modules, job, opts);
}
this.jobQueue = new JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>(
(job, importOpts) => {
return processChainSegment(modules, job, {...opts, ...importOpts});
},
{maxLength: QUEUE_MAX_LENGHT, signal},
modules.metrics ? modules.metrics.blockProcessorQueue : undefined
);
}

async processBlockJob(job: PartiallyVerifiedBlock): Promise<void> {
await this.jobQueue.push(job);
}

async processChainSegment(job: PartiallyVerifiedBlock[]): Promise<void> {
await this.jobQueue.push(job);
async processBlocksJob(job: allForks.SignedBeaconBlock[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}

///////////////////////////
// TODO: Run this functions with spec tests of many blocks
///////////////////////////

/**
* Validate and process a block
*
Expand All @@ -54,76 +49,31 @@ export class BlockProcessor {
*
* All other effects are provided by downstream event handlers
*/
export async function processBlock(
export async function processChainSegment(
modules: ProcessBlockModules,
partiallyVerifiedBlock: PartiallyVerifiedBlock,
opts: BlockProcessOpts
blocks: allForks.SignedBeaconBlock[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
return; // TODO: or throw?
} else if (blocks.length > 1) {
assertLinearChainSegment(modules.config, blocks);
}

try {
const fullyVerifiedBlock = await verifyBlock(modules, partiallyVerifiedBlock, opts);
await importBlock(modules, fullyVerifiedBlock);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);
const fullyVerifiedBlocks = await verifyBlocks(modules, blocks, opts);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
// Flag ignoreIfKnown causes BlockErrorCodes ALREADY_KNOWN, GENESIS_BLOCK to resolve.
// Return before emitting to not cause loud logging.
return;
for (const fullyVerifiedBlock of fullyVerifiedBlocks) {
// No need to sleep(0) here since `importBlock` includes a disk write
// TODO: Consider batching importBlock too if it takes significant time
await importBlock(modules, fullyVerifiedBlock, opts);
}

} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, blocks[0]);
modules.emitter.emit(ChainEvent.errorBlock, err);

throw err;
}
}

/**
* Similar to processBlockJob but this process a chain segment
*/
export async function processChainSegment(
modules: ProcessBlockModules,
partiallyVerifiedBlocks: PartiallyVerifiedBlock[],
opts: BlockProcessOpts
): Promise<void> {
const blocks = partiallyVerifiedBlocks.map((b) => b.block);
assertLinearChainSegment(modules.config, blocks);

let importedBlocks = 0;

for (const partiallyVerifiedBlock of partiallyVerifiedBlocks) {
try {
// TODO: Re-use preState
const fullyVerifiedBlock = await verifyBlock(modules, partiallyVerifiedBlock, opts);
await importBlock(modules, fullyVerifiedBlock);
importedBlocks++;

// this avoids keeping our node busy processing blocks
await sleep(0);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
continue;
}
if (partiallyVerifiedBlock.ignoreIfFinalized && err.type.code == BlockErrorCode.WOULD_REVERT_FINALIZED_SLOT) {
continue;
}

modules.emitter.emit(ChainEvent.errorBlock, err);

// Convert to ChainSegmentError to append `importedBlocks` data
const chainSegmentError = new ChainSegmentError(partiallyVerifiedBlock.block, err.type, importedBlocks);
chainSegmentError.stack = err.stack;
throw chainSegmentError;
}
throw e;
}
}

Expand Down
Loading

0 comments on commit ea38bf3

Please sign in to comment.