Skip to content

Commit

Permalink
Merge d6ce1a5 into f04d270
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jul 17, 2022
2 parents f04d270 + d6ce1a5 commit de2ec33
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 182 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export type RegenQueueItem = {

export type BlockProcessorQueueItem = {
blockSlots: Slot[];
jobOpts: Record<string, boolean | undefined>;
jobOpts: Record<string, string | number | boolean | undefined>;
addedTimeMs: number;
};

Expand Down
11 changes: 3 additions & 8 deletions packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,10 @@ export function getLodestarApi({

async getBlockProcessorQueueItems() {
return (chain as BeaconChain)["blockProcessor"].jobQueue.getItems().map((item) => {
const [job] = item.args;
const jobs = Array.isArray(job) ? job : [job];
const [blocks, opts] = item.args;
return {
blockSlots: jobs.map((j) => j.block.message.slot),
jobOpts: {
skipImportingAttestations: jobs[0].skipImportingAttestations,
validProposerSignature: jobs[0].validProposerSignature,
validSignatures: jobs[0].validSignatures,
},
blockSlots: blocks.map((block) => block.message.slot),
jobOpts: opts,
addedTimeMs: item.addedTimeMs,
};
});
Expand Down
12 changes: 8 additions & 4 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
import {ReprocessController, REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {CheckpointBalancesCache} from "../balancesCache.js";
import {FullyVerifiedBlock} from "./types.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {PendingEvents} from "./utils/pendingEvents.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";

Expand Down Expand Up @@ -68,8 +68,12 @@ export type ImportBlockModules = {
* - head_tracker.register_block(block_root, parent_root, slot)
* - Send events after everything is done
*/
export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock: FullyVerifiedBlock): Promise<void> {
const {block, postState, parentBlockSlot, skipImportingAttestations, executionStatus} = fullyVerifiedBlock;
export async function importBlock(
chain: ImportBlockModules,
fullyVerifiedBlock: FullyVerifiedBlock,
opts: ImportBlockOpts
): Promise<void> {
const {block, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const pendingEvents = new PendingEvents(chain.emitter);

// - Observe attestations
Expand Down Expand Up @@ -108,7 +112,7 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
// Only process attestations of blocks with relevant attestations for the fork-choice:
// If current epoch is N, and block is epoch X, block may include attestations for epoch X or X - 1.
// The latest block that is useful is at epoch N - 1 which may include attestations for epoch N - 1 or N - 2.
if (!skipImportingAttestations && blockEpoch >= currentEpoch - FORK_CHOICE_ATT_EPOCH_LIMIT) {
if (!opts.skipImportingAttestations && blockEpoch >= currentEpoch - FORK_CHOICE_ATT_EPOCH_LIMIT) {
const attestations = block.message.body.attestations;
const rootCache = new RootCache(postState);
const parentSlot = chain.forkChoice.getBlock(block.message.parentRoot)?.slot;
Expand Down
119 changes: 30 additions & 89 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment */
import {allForks} from "@lodestar/types";
import {sleep, toHex} from "@lodestar/utils";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockError, BlockErrorCode, ChainSegmentError} from "../errors/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {IBeaconChain} from "../interface.js";
import {verifyBlock, VerifyBlockModules} from "./verifyBlock.js";
import {importBlock, ImportBlockModules} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {PartiallyVerifiedBlock} from "./types.js";
export {PartiallyVerifiedBlockFlags} from "./types.js";
import {ImportBlockOpts} from "./types.js";
export {ImportBlockOpts} from "./types.js";

const QUEUE_MAX_LENGHT = 256;

Expand All @@ -23,28 +23,20 @@ export type ProcessBlockModules = VerifyBlockModules &
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[PartiallyVerifiedBlock[] | PartiallyVerifiedBlock], void>;
readonly jobQueue: JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>;

constructor(modules: ProcessBlockModules, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue(
(job) => {
if (!Array.isArray(job)) {
return processBlock(modules, job, opts);
} else {
return processChainSegment(modules, job, opts);
}
this.jobQueue = new JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks(modules, job, {...opts, ...importOpts});
},
{maxLength: QUEUE_MAX_LENGHT, signal},
modules.metrics ? modules.metrics.blockProcessorQueue : undefined
);
}

async processBlockJob(job: PartiallyVerifiedBlock): Promise<void> {
await this.jobQueue.push(job);
}

async processChainSegment(job: PartiallyVerifiedBlock[]): Promise<void> {
await this.jobQueue.push(job);
async processBlocksJob(job: allForks.SignedBeaconBlock[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}

Expand All @@ -58,92 +50,41 @@ export class BlockProcessor {
*
* All other effects are provided by downstream event handlers
*/
export async function processBlock(
export async function processBlocks(
chain: ProcessBlockModules,
partiallyVerifiedBlock: PartiallyVerifiedBlock,
opts: BlockProcessOpts
blocks: allForks.SignedBeaconBlock[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
return; // TODO: or throw?
} else if (blocks.length > 1) {
assertLinearChainSegment(chain.config, blocks);
}

try {
const fullyVerifiedBlock = await verifyBlock(chain, partiallyVerifiedBlock, opts);
await importBlock(chain, fullyVerifiedBlock);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);
for (const block of blocks) {
const fullyVerifiedBlock = await verifyBlock(chain, block, opts);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
// Flag ignoreIfKnown causes BlockErrorCodes ALREADY_KNOWN, GENESIS_BLOCK to resolve.
// Return before emitting to not cause loud logging.
return;
// No need to sleep(0) here since `importBlock` includes a disk write
// TODO: Consider batching importBlock too if it takes significant time
await importBlock(chain, fullyVerifiedBlock, opts);
}
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, blocks[0]);

// TODO: De-duplicate with logic above
// ChainEvent.errorBlock
if (!(err instanceof BlockError)) {
chain.logger.error("Non BlockError received", {}, err);
return;
}

if (!opts.disableOnBlockError) {
} else if (!opts.disableOnBlockError) {
onBlockError(chain, err);
}

throw err;
}
}

/**
* Similar to processBlockJob but this process a chain segment
*/
export async function processChainSegment(
chain: ProcessBlockModules,
partiallyVerifiedBlocks: PartiallyVerifiedBlock[],
opts: BlockProcessOpts
): Promise<void> {
const blocks = partiallyVerifiedBlocks.map((b) => b.block);
assertLinearChainSegment(chain.config, blocks);

let importedBlocks = 0;

for (const partiallyVerifiedBlock of partiallyVerifiedBlocks) {
try {
// TODO: Re-use preState
const fullyVerifiedBlock = await verifyBlock(chain, partiallyVerifiedBlock, opts);
await importBlock(chain, fullyVerifiedBlock);
importedBlocks++;

// this avoids keeping our node busy processing blocks
await sleep(0);
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, partiallyVerifiedBlock.block);

if (
partiallyVerifiedBlock.ignoreIfKnown &&
(err.type.code === BlockErrorCode.ALREADY_KNOWN || err.type.code === BlockErrorCode.GENESIS_BLOCK)
) {
continue;
}
if (partiallyVerifiedBlock.ignoreIfFinalized && err.type.code == BlockErrorCode.WOULD_REVERT_FINALIZED_SLOT) {
continue;
}

// TODO: De-duplicate with logic above
// ChainEvent.errorBlock
if (!(err instanceof BlockError)) {
chain.logger.error("Non BlockError received", {}, err);
} else if (!opts.disableOnBlockError) {
onBlockError(chain, err);
}

// Convert to ChainSegmentError to append `importedBlocks` data
const chainSegmentError = new ChainSegmentError(partiallyVerifiedBlock.block, err.type, importedBlocks);
chainSegmentError.stack = err.stack;
throw chainSegmentError;
}
}
}

function getBlockError(e: unknown, block: allForks.SignedBeaconBlock): BlockError {
if (e instanceof BlockError) {
return e;
Expand Down
30 changes: 10 additions & 20 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {ExecutionStatus} from "@lodestar/fork-choice";
import {allForks, Slot} from "@lodestar/types";

export type FullyVerifiedBlockFlags = {
export type ImportBlockOpts = {
/**
* TEMP: Review if this is safe, Lighthouse always imports attestations even in finalized sync.
*/
Expand All @@ -19,17 +19,6 @@ export type FullyVerifiedBlockFlags = {
* Used by range sync.
*/
ignoreIfFinalized?: boolean;
};

export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
/**
* Metadata: `true` if only the block proposer signature has been verified
*/
validProposerSignature?: boolean;
/**
* Metadata: `true` if all the signatures including the proposer signature have been verified
*/
validSignatures?: boolean;
/**
* From RangeSync module, we won't attest to this block so it's okay to ignore a SYNCING message from execution layer
*/
Expand All @@ -38,14 +27,22 @@ export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
* Verify signatures on main thread or not.
*/
blsVerifyOnMainThread?: boolean;
/**
* Metadata: `true` if only the block proposer signature has been verified
*/
validProposerSignature?: boolean;
/**
* Metadata: `true` if all the signatures including the proposer signature have been verified
*/
validSignatures?: boolean;
/** Seen timestamp seconds */
seenTimestampSec?: number;
};

/**
* A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and ready to import
*/
export type FullyVerifiedBlock = FullyVerifiedBlockFlags & {
export type FullyVerifiedBlock = {
block: allForks.SignedBeaconBlock;
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
Expand All @@ -57,10 +54,3 @@ export type FullyVerifiedBlock = FullyVerifiedBlockFlags & {
/** Seen timestamp seconds */
seenTimestampSec: number;
};

/**
* A wrapper around a block that's partially verified: after gossip validation `validProposerSignature = true`
*/
export type PartiallyVerifiedBlock = PartiallyVerifiedBlockFlags & {
block: allForks.SignedBeaconBlock;
};
35 changes: 13 additions & 22 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
getBlockSignatureSets,
stateTransition,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {allForks, bellatrix} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {IForkChoice, ProtoBlock, ExecutionStatus, assertValidTerminalPowBlock} from "@lodestar/fork-choice";
import {IChainForkConfig} from "@lodestar/config";
Expand All @@ -23,7 +23,7 @@ import {IBlsVerifier} from "../bls/index.js";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {byteArrayEquals} from "../../util/bytes.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {FullyVerifiedBlock, PartiallyVerifiedBlock} from "./types.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js";

export type VerifyBlockModules = {
Expand All @@ -44,26 +44,21 @@ export type VerifyBlockModules = {
*/
export async function verifyBlock(
chain: VerifyBlockModules,
partiallyVerifiedBlock: PartiallyVerifiedBlock,
opts: BlockProcessOpts
block: allForks.SignedBeaconBlock,
opts: ImportBlockOpts & BlockProcessOpts
): Promise<FullyVerifiedBlock> {
const parentBlock = verifyBlockSanityChecks(chain, partiallyVerifiedBlock);
const parentBlock = verifyBlockSanityChecks(chain, block);

const {postState, executionStatus, proposerBalanceDiff} = await verifyBlockStateTransition(
chain,
partiallyVerifiedBlock,
opts
);
const {postState, executionStatus, proposerBalanceDiff} = await verifyBlockStateTransition(chain, block, opts);

return {
block: partiallyVerifiedBlock.block,
block,
postState,
parentBlockSlot: parentBlock.slot,
skipImportingAttestations: partiallyVerifiedBlock.skipImportingAttestations,
executionStatus,
proposerBalanceDiff,
// TODO: Make this param mandatory and capture in gossip
seenTimestampSec: partiallyVerifiedBlock.seenTimestampSec ?? Math.floor(Date.now() / 1000),
seenTimestampSec: opts.seenTimestampSec ?? Math.floor(Date.now() / 1000),
};
}

Expand All @@ -79,11 +74,7 @@ export async function verifyBlock(
* - Not finalized slot
* - Not already known
*/
export function verifyBlockSanityChecks(
chain: VerifyBlockModules,
partiallyVerifiedBlock: PartiallyVerifiedBlock
): ProtoBlock {
const {block} = partiallyVerifiedBlock;
export function verifyBlockSanityChecks(chain: VerifyBlockModules, block: allForks.SignedBeaconBlock): ProtoBlock {
const blockSlot = block.message.slot;

// Not genesis block
Expand Down Expand Up @@ -132,10 +123,10 @@ export function verifyBlockSanityChecks(
*/
export async function verifyBlockStateTransition(
chain: VerifyBlockModules,
partiallyVerifiedBlock: PartiallyVerifiedBlock,
opts: BlockProcessOpts
block: allForks.SignedBeaconBlock,
opts: ImportBlockOpts & BlockProcessOpts
): Promise<{postState: CachedBeaconStateAllForks; executionStatus: ExecutionStatus; proposerBalanceDiff: number}> {
const {block, validProposerSignature, validSignatures} = partiallyVerifiedBlock;
const {validProposerSignature, validSignatures} = opts;

// TODO: Skip in process chain segment
// Retrieve preState from cache (regen)
Expand Down Expand Up @@ -185,7 +176,7 @@ export async function verifyBlockStateTransition(
if (
signatureSets.length > 0 &&
!(await chain.bls.verifySignatureSets(signatureSets, {
verifyOnMainThread: partiallyVerifiedBlock?.blsVerifyOnMainThread,
verifyOnMainThread: opts?.blsVerifyOnMainThread,
}))
) {
throw new BlockError(block, {code: BlockErrorCode.INVALID_SIGNATURE, state: postState});
Expand Down
Loading

0 comments on commit de2ec33

Please sign in to comment.