Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sync speed #3989

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {JobItemQueue} from "../../util/queue/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {IBeaconChain} from "../interface.js";
import {VerifyBlockModules, verifyBlockStateTransition} from "./verifyBlock.js";
import {VerifyBlockModules, verifyBlocksInEpoch} from "./verifyBlock.js";
import {importBlock, ImportBlockModules} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
Expand Down Expand Up @@ -70,21 +70,27 @@ export async function processBlocks(
return;
}

for (const [i, block] of relevantBlocks.entries()) {
// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postState, executionStatus, proposerBalanceDelta} = await verifyBlockStateTransition(chain, block, opts);
// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, executionStatuses, proposerBalanceDeltas} = await verifyBlocksInEpoch(
chain,
relevantBlocks,
opts
);

const fullyVerifiedBlock: FullyVerifiedBlock = {
const fullyVerifiedBlocks = relevantBlocks.map(
(block, i): FullyVerifiedBlock => ({
block,
postState,
postState: postStates[i],
parentBlockSlot: parentSlots[i],
executionStatus,
proposerBalanceDelta,
executionStatus: executionStatuses[i],
proposerBalanceDelta: proposerBalanceDeltas[i],
// TODO: Make this param mandatory and capture in gossip
seenTimestampSec: opts.seenTimestampSec ?? Math.floor(Date.now() / 1000),
};
})
);

for (const fullyVerifiedBlock of fullyVerifiedBlocks) {
// No need to sleep(0) here since `importBlock` includes a disk write
// TODO: Consider batching importBlock too if it takes significant time
await importBlock(chain, fullyVerifiedBlock, opts);
Expand Down
295 changes: 59 additions & 236 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import {
CachedBeaconStateAllForks,
isBellatrixStateType,
isBellatrixBlockBodyType,
isMergeTransitionBlock as isMergeTransitionBlockFn,
isExecutionEnabled,
} from "@lodestar/state-transition";
import {bellatrix, allForks} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {IForkChoice, ExecutionStatus, assertValidTerminalPowBlock} from "@lodestar/fork-choice";
import {IChainForkConfig} from "@lodestar/config";
import {ErrorAborted, ILogger} from "@lodestar/utils";
import {IExecutionEngine} from "../../execution/engine/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {IBeaconClock} from "../clock/index.js";
import {BlockProcessOpts} from "../options.js";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";

type VerifyBlockModules = {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
clock: IBeaconClock;
logger: ILogger;
forkChoice: IForkChoice;
config: IChainForkConfig;
};

/**
* Verifies 1 or more execution payloads from a linear sequence of blocks.
*
* Since the EL client must be aware of each parent, all payloads must be submited in sequence.
*/
export async function verifyBlocksExecutionPayload(
chain: VerifyBlockModules,
blocks: allForks.SignedBeaconBlock[],
preState0: CachedBeaconStateAllForks,
signal: AbortSignal,
opts: BlockProcessOpts
): Promise<{executionStatuses: ExecutionStatus[]; mergeBlockFound: bellatrix.BeaconBlock | null}> {
const executionStatuses: ExecutionStatus[] = [];
let mergeBlockFound: bellatrix.BeaconBlock | null = null;

for (const block of blocks) {
// If blocks are invalid in consensus the main promise could resolve before this loop ends.
// In that case stop sending blocks to execution engine
if (signal.aborted) {
throw new ErrorAborted("verifyBlockExecutionPayloads");
}

const {executionStatus} = await verifyBlockExecutionPayload(chain, block, preState0, opts);
executionStatuses.push(executionStatus);

const isMergeTransitionBlock =
isBellatrixStateType(preState0) &&
isBellatrixBlockBodyType(block.message.body) &&
isMergeTransitionBlockFn(preState0, block.message.body);

// If this is a merge transition block, check to ensure if it references
// a valid terminal PoW block.
//
// However specs define this check to be run inside forkChoice's onBlock
// (https://github.com/ethereum/consensus-specs/blob/dev/specs/bellatrix/fork-choice.md#on_block)
// but we perform the check here (as inspired from the lighthouse impl)
//
// Reasons:
// 1. If the block is not valid, we should fail early and not wait till
// forkChoice import.
// 2. It makes logical sense to pair it with the block validations and
// deal it with the external services like eth1 tracker here than
// in import block
if (isMergeTransitionBlock) {
const mergeBlock = block.message as bellatrix.BeaconBlock;
const mergeBlockHash = toHexString(
chain.config.getForkTypes(mergeBlock.slot).BeaconBlock.hashTreeRoot(mergeBlock)
);
const powBlockRootHex = toHexString(mergeBlock.body.executionPayload.parentHash);
const powBlock = await chain.eth1.getPowBlock(powBlockRootHex).catch((error) => {
// Lets just warn the user here, errors if any will be reported on
// `assertValidTerminalPowBlock` checks
chain.logger.warn(
"Error fetching terminal PoW block referred in the merge transition block",
{powBlockHash: powBlockRootHex, mergeBlockHash},
error
);
return null;
});

const powBlockParent =
powBlock &&
(await chain.eth1.getPowBlock(powBlock.parentHash).catch((error) => {
// Lets just warn the user here, errors if any will be reported on
// `assertValidTerminalPowBlock` checks
chain.logger.warn(
"Error fetching parent of the terminal PoW block referred in the merge transition block",
{powBlockParentHash: powBlock.parentHash, powBlock: powBlockRootHex, mergeBlockHash},
error
);
return null;
}));

// executionStatus will never == ExecutionStatus.PreMerge if it's the mergeBlock. But gotta make TS happy =D
if (executionStatus === ExecutionStatus.PreMerge) {
throw Error("Merge block must not have executionStatus == PreMerge");
}

assertValidTerminalPowBlock(chain.config, mergeBlock, {executionStatus, powBlock, powBlockParent});

// Valid execution payload, but may not be in a valid beacon chain block. Delay printing the POS ACTIVATED banner
// to the end of the verify block routine, which confirms that this block is fully valid.
mergeBlockFound = mergeBlock;
}
}

return {executionStatuses, mergeBlockFound};
}

/**
* Verifies a single block execution payload by sending it to the EL client (via HTTP).
*/
export async function verifyBlockExecutionPayload(
chain: VerifyBlockModules,
block: allForks.SignedBeaconBlock,
preState0: CachedBeaconStateAllForks,
opts: BlockProcessOpts
): Promise<{executionStatus: ExecutionStatus}> {
/** Not null if execution is enabled */
const executionPayloadEnabled =
isBellatrixStateType(preState0) &&
isBellatrixBlockBodyType(block.message.body) &&
// Safe to use with a state previous to block's preState. isMergeComplete can only transition from false to true.
// - If preState0 is after merge block: condition is true, and will always be true
// - If preState0 is before merge block: the block could lie but then state transition function will throw above
// It is kinda safe to send non-trusted payloads to the execution client because at most it can trigger sync.
// TODO: If this becomes a problem, do some basic verification beforehand, like checking the proposer signature.
isExecutionEnabled(preState0, block.message)
? block.message.body.executionPayload
: null;

if (!executionPayloadEnabled) {
// isExecutionEnabled() -> false
return {executionStatus: ExecutionStatus.PreMerge};
}

// TODO: Handle better notifyNewPayload() returning error is syncing
const execResult = await chain.executionEngine.notifyNewPayload(executionPayloadEnabled);

switch (execResult.status) {
case ExecutePayloadStatus.VALID:
chain.forkChoice.validateLatestHash(execResult.latestValidHash, null);
return {executionStatus: ExecutionStatus.Valid};

case ExecutePayloadStatus.INVALID: {
// If the parentRoot is not same as latestValidHash, then the branch from latestValidHash
// to parentRoot needs to be invalidated
const parentHashHex = toHexString(block.message.parentRoot);
chain.forkChoice.validateLatestHash(
execResult.latestValidHash,
parentHashHex !== execResult.latestValidHash ? parentHashHex : null
);
throw new BlockError(block, {
code: BlockErrorCode.EXECUTION_ENGINE_ERROR,
execStatus: execResult.status,
errorMessage: execResult.validationError ?? "",
});
}

// Accepted and Syncing have the same treatment, as final validation of block is pending
case ExecutePayloadStatus.ACCEPTED:
case ExecutePayloadStatus.SYNCING: {
// It's okay to ignore SYNCING status as EL could switch into syncing
// 1. On intial startup/restart
// 2. When some reorg might have occured and EL doesn't has a parent root
// (observed on devnets)
// 3. Because of some unavailable (and potentially invalid) root but there is no way
// of knowing if this is invalid/unavailable. For unavailable block, some proposer
// will (sooner or later) build on the available parent head which will
// eventually win in fork-choice as other validators vote on VALID blocks.
// Once EL catches up again and respond VALID, the fork choice will be updated which
// will either validate or prune invalid blocks
//
// When to import such blocks:
// From: https://github.com/ethereum/consensus-specs/pull/2844
// A block MUST NOT be optimistically imported, unless either of the following
// conditions are met:
//
// 1. Parent of the block has execution
// 2. The justified checkpoint has execution enabled
// 3. The current slot (as per the system clock) is at least
// SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY ahead of the slot of the block being
// imported.

const parentRoot = toHexString(block.message.parentRoot);
const parentBlock = chain.forkChoice.getBlockHex(parentRoot);
const justifiedBlock = chain.forkChoice.getJustifiedBlock();

if (
!parentBlock ||
// Following condition is the !(Not) of the safe import condition
(parentBlock.executionStatus === ExecutionStatus.PreMerge &&
justifiedBlock.executionStatus === ExecutionStatus.PreMerge &&
block.message.slot + opts.safeSlotsToImportOptimistically > chain.clock.currentSlot)
) {
throw new BlockError(block, {
code: BlockErrorCode.EXECUTION_ENGINE_ERROR,
execStatus: ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS,
errorMessage: `not safe to import ${execResult.status} payload within ${opts.safeSlotsToImportOptimistically} of currentSlot, status=${execResult.status}`,
});
}

return {executionStatus: ExecutionStatus.Syncing};
}

// If the block has is not valid, or it referenced an invalid terminal block then the
// block is invalid, however it has no bearing on any forkChoice cleanup
//
// There can be other reasons for which EL failed some of the observed ones are
// 1. Connection refused / can't connect to EL port
// 2. EL Internal Error
// 3. Geth sometimes gives invalid merkel root error which means invalid
// but expects it to be handled in CL as of now. But we should log as warning
// and give it as optimistic treatment and expect any other non-geth CL<>EL
// combination to reject the invalid block and propose a block.
// On kintsugi devnet, this has been observed to cause contiguous proposal failures
// as the network is geth dominated, till a non geth node proposes and moves network
// forward
// For network/unreachable errors, an optimization can be added to replay these blocks
// back. But for now, lets assume other mechanisms like unknown parent block of a future
// child block will cause it to replay

case ExecutePayloadStatus.INVALID_BLOCK_HASH:
case ExecutePayloadStatus.ELERROR:
case ExecutePayloadStatus.UNAVAILABLE:
throw new BlockError(block, {
code: BlockErrorCode.EXECUTION_ENGINE_ERROR,
execStatus: execResult.status,
errorMessage: execResult.validationError,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export function verifyBlocksSanityChecks(
const relevantBlocks: allForks.SignedBeaconBlock[] = [];
const parentSlots: Slot[] = [];

for (let i = 0; i < blocks.length; i++) {
const block = blocks[i];
for (const block of blocks) {
const blockSlot = block.message.slot;

// Not genesis block
Expand Down
80 changes: 80 additions & 0 deletions packages/beacon-node/src/chain/blocks/verifyBlocksSignatures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import {CachedBeaconStateAllForks, getBlockSignatureSets} from "@lodestar/state-transition";
import {allForks} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {IBlsVerifier} from "../bls/index.js";
import {BlockError, BlockErrorCode} from "../errors/blockError.js";
import {ImportBlockOpts} from "./types.js";

/**
* Verifies 1 or more block's signatures from a group of blocks in the same epoch.
* getBlockSignatureSets() guarantees to return the correct signingRoots as long as all blocks belong in the same
* epoch as `preState0`. Otherwise the shufflings won't be correct.
*
* Since all data is known in advance all signatures are verified at once in parallel.
*/
export async function verifyBlocksSignatures(
chain: {bls: IBlsVerifier},
preState0: CachedBeaconStateAllForks,
blocks: allForks.SignedBeaconBlock[],
opts: ImportBlockOpts
): Promise<void> {
const isValidPromises: Promise<boolean>[] = [];

// Verifies signatures after running state transition, so all SyncCommittee signed roots are known at this point.
// We must ensure block.slot <= state.slot before running getAllBlockSignatureSets().
// NOTE: If in the future multiple blocks signatures are verified at once, all blocks must be in the same epoch
// so the attester and proposer shufflings are correct.
for (const [i, block] of blocks.entries()) {
// Use [i] to make clear that the index has to be correct to blame the right block below on BlockError()
isValidPromises[i] = opts.validSignatures
? // Skip all signature verification
Promise.resolve(true)
: //
// Verify signatures per block to track which block is invalid
chain.bls.verifySignatureSets(
getBlockSignatureSets(preState0, block, {skipProposerSignature: opts.validProposerSignature})
);

// getBlockSignatureSets() takes 45ms in benchmarks for 2022Q2 mainnet blocks (100 sigs). When syncing a 32 blocks
// segments it will block the event loop for 1400 ms, which is too much. This sleep will allow the event loop to
// yield, which will cause one block's state transition to run. However, the tradeoff is okay and doesn't slow sync
if ((i + 1) % 8 === 0) {
await sleep(0);
}
}

// `rejectFirstInvalidResolveAllValid()` returns on isValid result with its index
const res = await rejectFirstInvalidResolveAllValid(isValidPromises);
if (!res.allValid) {
throw new BlockError(blocks[res.index], {code: BlockErrorCode.INVALID_SIGNATURE, state: preState0});
}
}

type AllValidRes = {allValid: true} | {allValid: false; index: number};

/**
* From an array of promises that resolve a boolean isValid
* - if all valid, await all and return
* - if one invalid, abort immediately and return index of invalid
*/
export function rejectFirstInvalidResolveAllValid(isValidPromises: Promise<boolean>[]): Promise<AllValidRes> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you move this to utils module or any utils file? this could be reused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counter argument is that if we stop using this function it will stay in utils forever un-used. It's very specific to this code, if it gets re-used in the future then we can move to utils

return new Promise<AllValidRes>((resolve, reject) => {
let validCount = 0;

for (let i = 0; i < isValidPromises.length; i++) {
isValidPromises[i]
.then((isValid) => {
if (isValid) {
if (++validCount >= isValidPromises.length) {
resolve({allValid: true});
}
} else {
resolve({allValid: false, index: i});
}
})
.catch((e) => {
reject(e);
});
}
});
}
Loading