diff --git a/packages/beacon-node/src/sync/constants.ts b/packages/beacon-node/src/sync/constants.ts index 52259eb31f4..c7672194ccb 100644 --- a/packages/beacon-node/src/sync/constants.ts +++ b/packages/beacon-node/src/sync/constants.ts @@ -10,8 +10,19 @@ export const MAX_BATCH_DOWNLOAD_ATTEMPTS = 5; /** Consider batch faulty after downloading and processing this number of times */ export const MAX_BATCH_PROCESSING_ATTEMPTS = 3; -/** Batch range excludes the first block of the epoch. @see Batch */ -export const BATCH_SLOT_OFFSET = 1; +/** + * Number of slots to offset batches. + * + * Before Jul2022 an offset of 1 was required to download the checkpoint block during finalized sync. Otherwise + * the block necessary so switch from Finalized sync to Head sync won't be in the fork-choice and range sync would + * be stuck in a loop downloading the previous epoch to finalized epoch, until we get rate-limited. + * + * After Jul2022 during finalized sync the entire epoch of finalized epoch will be downloaded fullfilling the goal + * to switch to Head sync latter. This does not affect performance nor sync speed and just downloads a few extra + * blocks that would be required by Head sync anyway. However, having an offset of 0 allows to send to the processor + * blocks that belong to the same epoch, which enables batch verification optimizations. + */ +export const BATCH_SLOT_OFFSET = 0; /** First epoch to allow to start gossip */ export const MIN_EPOCH_TO_START_GOSSIP = -1; @@ -23,14 +34,17 @@ export const MIN_EPOCH_TO_START_GOSSIP = -1; * we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which * case the responder will fill the response up to the max request size, assuming they have the * bandwidth to do so. + * + * Jul2022: Current batch block processor wants only blocks in the same epoch. So we'll process only + * one batch at a time. Metrics can confirm preliminary tests that speed is as good. */ -export const EPOCHS_PER_BATCH = 2; +export const EPOCHS_PER_BATCH = 1; /** * The maximum number of batches to queue before requesting more. * In good network conditions downloading batches is much faster than processing them - * A number > 5 results in wasted progress when the chain completes syncing + *A number > 10 epochs worth results in wasted progress when the chain completes syncing * * TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized? */ -export const BATCH_BUFFER_SIZE = 5; +export const BATCH_BUFFER_SIZE = Math.ceil(10 / EPOCHS_PER_BATCH); diff --git a/packages/beacon-node/src/sync/range/batch.ts b/packages/beacon-node/src/sync/range/batch.ts index e615d62f996..f819cf09e7e 100644 --- a/packages/beacon-node/src/sync/range/batch.ts +++ b/packages/beacon-node/src/sync/range/batch.ts @@ -1,16 +1,10 @@ import PeerId from "peer-id"; import {allForks, Epoch, phase0} from "@lodestar/types"; -import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {IChainForkConfig} from "@lodestar/config"; import {LodestarError} from "@lodestar/utils"; -import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; -import {BATCH_SLOT_OFFSET, MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js"; +import {MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js"; import {BlockError, BlockErrorCode} from "../../chain/errors/index.js"; -import {hashBlocks} from "./utils/index.js"; - -export type BatchOpts = { - epochsPerBatch: Epoch; -}; +import {getBatchSlotRange, hashBlocks} from "./utils/index.js"; /** * Current state of a batch @@ -54,14 +48,15 @@ export type BatchMetadata = { }; /** - * Batches are downloaded excluding the first block of the epoch assuming it has already been - * downloaded. + * Batches are downloaded at the first block of the epoch. * * For example: * * Epoch boundary | | * ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | - * Batch 1 | Batch 2 | Batch 3 + * Batch 1 | Batch 2 | Batch 3 + * + * Jul2022: Offset changed from 1 to 0, see rationale in {@link BATCH_SLOT_OFFSET} */ export class Batch { readonly startEpoch: Epoch; @@ -77,15 +72,14 @@ export class Batch { private readonly failedDownloadAttempts: PeerId[] = []; private readonly config: IChainForkConfig; - constructor(startEpoch: Epoch, config: IChainForkConfig, opts: BatchOpts) { - const startSlot = computeStartSlotAtEpoch(startEpoch) + BATCH_SLOT_OFFSET; - const endSlot = startSlot + opts.epochsPerBatch * SLOTS_PER_EPOCH; + constructor(startEpoch: Epoch, config: IChainForkConfig) { + const {startSlot, count} = getBatchSlotRange(startEpoch); this.config = config; this.startEpoch = startEpoch; this.request = { - startSlot: startSlot, - count: endSlot - startSlot, + startSlot, + count, step: 1, }; } diff --git a/packages/beacon-node/src/sync/range/chain.ts b/packages/beacon-node/src/sync/range/chain.ts index ee4a59350b4..3147b7a98a5 100644 --- a/packages/beacon-node/src/sync/range/chain.ts +++ b/packages/beacon-node/src/sync/range/chain.ts @@ -1,6 +1,5 @@ import PeerId from "peer-id"; import {Epoch, Root, Slot, phase0, allForks} from "@lodestar/types"; -import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {ErrorAborted, ILogger} from "@lodestar/utils"; import {IChainForkConfig} from "@lodestar/config"; import {toHexString} from "@chainsafe/ssz"; @@ -10,20 +9,20 @@ import {byteArrayEquals} from "../../util/bytes.js"; import {PeerMap} from "../../util/peerMap.js"; import {wrapError} from "../../util/wrapError.js"; import {RangeSyncType} from "../utils/remoteSyncType.js"; -import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, BATCH_SLOT_OFFSET} from "../constants.js"; -import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchOpts, BatchStatus} from "./batch.js"; +import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH} from "../constants.js"; +import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js"; import { validateBatchesStatus, getNextBatchToProcess, - toBeProcessedStartEpoch, toBeDownloadedStartEpoch, toArr, ChainPeersBalancer, computeMostCommonTarget, + batchStartEpochIsAfterSlot, + isSyncChainDone, + getBatchSlotRange, } from "./utils/index.js"; -export type SyncChainOpts = Partial; - export type SyncChainModules = { config: IChainForkConfig; logger: ILogger; @@ -70,7 +69,7 @@ export type SyncChainDebugState = { export enum SyncChainStatus { Stopped = "Stopped", Syncing = "Syncing", - Synced = "Synced", + Done = "Done", Error = "Error", } @@ -94,8 +93,12 @@ export class SyncChain { /** Number of validated epochs. For the SyncRange to prevent switching chains too fast */ validatedEpochs = 0; - /** The start of the chain segment. Any epoch previous to this one has been validated. */ - private startEpoch: Epoch; + readonly firstBatchEpoch: Epoch; + /** + * The start of the chain segment. Any epoch previous to this one has been validated. + * But the `lastEpochWithProcessBlocks` may not be valid entirely. The + */ + private lastEpochWithProcessBlocks: Epoch; private status = SyncChainStatus.Stopped; private readonly processChainSegment: SyncChainFns["processChainSegment"]; @@ -109,17 +112,16 @@ export class SyncChain { private readonly logger: ILogger; private readonly config: IChainForkConfig; - private readonly opts: BatchOpts; constructor( - startEpoch: Epoch, + initialBatchEpoch: Epoch, initialTarget: ChainTarget, syncType: RangeSyncType, fns: SyncChainFns, - modules: SyncChainModules, - opts?: SyncChainOpts + modules: SyncChainModules ) { - this.startEpoch = startEpoch; + this.firstBatchEpoch = initialBatchEpoch; + this.lastEpochWithProcessBlocks = initialBatchEpoch; this.target = initialTarget; this.syncType = syncType; this.processChainSegment = fns.processChainSegment; @@ -127,7 +129,6 @@ export class SyncChain { this.reportPeer = fns.reportPeer; this.config = modules.config; this.logger = modules.logger; - this.opts = {epochsPerBatch: opts?.epochsPerBatch ?? EPOCHS_PER_BATCH}; this.logId = `${syncType}`; // Trigger event on parent class @@ -148,17 +149,24 @@ export class SyncChain { case SyncChainStatus.Syncing: return; // Skip, already started case SyncChainStatus.Error: - case SyncChainStatus.Synced: + case SyncChainStatus.Done: throw new SyncChainStartError(`Attempted to start an ended SyncChain ${this.status}`); } this.status = SyncChainStatus.Syncing; + this.logger.debug("SyncChain startSyncing", { + localFinalizedEpoch, + lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks, + targetSlot: this.target.slot, + }); + // to avoid dropping local progress, we advance the chain with its batch boundaries. // get the aligned epoch that produces a batch containing the `localFinalizedEpoch` - const localFinalizedEpochAligned = - this.startEpoch + Math.floor((localFinalizedEpoch - this.startEpoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH; - this.advanceChain(localFinalizedEpochAligned); + const lastEpochWithProcessBlocksAligned = + this.lastEpochWithProcessBlocks + + Math.floor((localFinalizedEpoch - this.lastEpochWithProcessBlocks) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH; + this.advanceChain(lastEpochWithProcessBlocksAligned); // Potentially download new batches and process pending this.triggerBatchDownloader(); @@ -205,8 +213,9 @@ export class SyncChain { return toArr(this.batches).map((batch) => batch.getMetadata()); } - get startEpochValue(): Epoch { - return this.startEpoch; + get lastValidatedSlot(): Slot { + // Last epoch of the batch after the last one validated + return getBatchSlotRange(this.lastEpochWithProcessBlocks + EPOCHS_PER_BATCH).startSlot - 1; } get isSyncing(): boolean { @@ -214,7 +223,7 @@ export class SyncChain { } get isRemovable(): boolean { - return this.status === SyncChainStatus.Error || this.status === SyncChainStatus.Synced; + return this.status === SyncChainStatus.Error || this.status === SyncChainStatus.Done; } get peers(): number { @@ -232,7 +241,7 @@ export class SyncChain { targetSlot: this.target.slot, syncType: this.syncType, status: this.status, - startEpoch: this.startEpoch, + startEpoch: this.lastEpochWithProcessBlocks, peers: this.peers, batches: this.getBatchesState(), }; @@ -260,9 +269,8 @@ export class SyncChain { // TODO: Consider running this check less often after the sync is well tested validateBatchesStatus(toArr(this.batches)); - // If startEpoch of the next batch to be processed > targetEpoch -> Done - const toBeProcessedEpoch = toBeProcessedStartEpoch(toArr(this.batches), this.startEpoch, this.opts); - if (computeStartSlotAtEpoch(toBeProcessedEpoch) >= this.target.slot) { + // Returns true if SyncChain has processed all possible blocks with slot <= target.slot + if (isSyncChainDone(toArr(this.batches), this.lastEpochWithProcessBlocks, this.target.slot)) { break; } @@ -271,8 +279,8 @@ export class SyncChain { if (batch) await this.processBatch(batch); } - this.status = SyncChainStatus.Synced; - this.logger.verbose("SyncChain Synced", {id: this.logId}); + this.status = SyncChainStatus.Done; + this.logger.verbose("SyncChain Done", {id: this.logId}); } catch (e) { if (e instanceof ErrorAborted) { return; // Ignore @@ -366,11 +374,10 @@ export class SyncChain { } // This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch - const startEpoch = toBeDownloadedStartEpoch(batches, this.startEpoch, this.opts); - const toBeDownloadedSlot = computeStartSlotAtEpoch(startEpoch) + BATCH_SLOT_OFFSET; + const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks); - // Don't request batches beyond the target head slot - if (toBeDownloadedSlot > this.target.slot) { + // Don't request batches beyond the target head slot. The to-be-downloaded batch must be strictly after target.slot + if (batchStartEpochIsAfterSlot(startEpoch, this.target.slot)) { return null; } @@ -379,7 +386,7 @@ export class SyncChain { return null; } - const batch = new Batch(startEpoch, this.config, this.opts); + const batch = new Batch(startEpoch, this.config); this.batches.set(startEpoch, batch); return batch; } @@ -459,16 +466,16 @@ export class SyncChain { } /** - * Drops any batches previous to `newStartEpoch` and updates the chain boundaries + * Drops any batches previous to `newLatestValidatedEpoch` and updates the chain boundaries */ - private advanceChain(newStartEpoch: Epoch): void { + private advanceChain(newLastEpochWithProcessBlocks: Epoch): void { // make sure this epoch produces an advancement - if (newStartEpoch <= this.startEpoch) { + if (newLastEpochWithProcessBlocks <= this.lastEpochWithProcessBlocks) { return; } for (const [batchKey, batch] of this.batches.entries()) { - if (batch.startEpoch < newStartEpoch) { + if (batch.startEpoch < newLastEpochWithProcessBlocks) { this.batches.delete(batchKey); this.validatedEpochs += EPOCHS_PER_BATCH; @@ -488,7 +495,7 @@ export class SyncChain { } } - this.startEpoch = newStartEpoch; + this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks; } } diff --git a/packages/beacon-node/src/sync/range/range.ts b/packages/beacon-node/src/sync/range/range.ts index 272bd1918a2..73bb2f17625 100644 --- a/packages/beacon-node/src/sync/range/range.ts +++ b/packages/beacon-node/src/sync/range/range.ts @@ -1,17 +1,17 @@ import {EventEmitter} from "events"; import StrictEventEmitter from "strict-event-emitter-types"; import PeerId from "peer-id"; -import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition"; +import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {IBeaconConfig} from "@lodestar/config"; -import {Epoch, Slot, phase0} from "@lodestar/types"; -import {ILogger} from "@lodestar/utils"; +import {Epoch, phase0} from "@lodestar/types"; +import {ILogger, toHex} from "@lodestar/utils"; import {IBeaconChain} from "../../chain/index.js"; import {INetwork} from "../../network/index.js"; import {IMetrics} from "../../metrics/index.js"; -import {RangeSyncType, getRangeSyncType, rangeSyncTypes} from "../utils/remoteSyncType.js"; +import {RangeSyncType, rangeSyncTypes, getRangeSyncTarget} from "../utils/remoteSyncType.js"; import {ImportBlockOpts} from "../../chain/blocks/index.js"; import {updateChains} from "./utils/index.js"; -import {ChainTarget, SyncChainFns, SyncChain, SyncChainOpts, SyncChainDebugState} from "./chain.js"; +import {ChainTarget, SyncChainFns, SyncChain, SyncChainDebugState} from "./chain.js"; export enum RangeSyncEvent { completedChain = "RangeSync-completedChain", @@ -45,7 +45,7 @@ export type RangeSyncModules = { logger: ILogger; }; -export type RangeSyncOpts = SyncChainOpts & { +export type RangeSyncOpts = { disableProcessAsChainSegment?: boolean; }; @@ -112,32 +112,14 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) { */ addPeer(peerId: PeerId, localStatus: phase0.Status, peerStatus: phase0.Status): void { // Compute if we should do a Finalized or Head sync with this peer - const syncType = getRangeSyncType(localStatus, peerStatus, this.chain.forkChoice); - this.logger.debug("Sync peer joined", {peer: peerId.toB58String(), syncType}); - - let startEpoch: Slot; - let target: ChainTarget; - switch (syncType) { - case RangeSyncType.Finalized: { - startEpoch = localStatus.finalizedEpoch; - target = { - slot: computeStartSlotAtEpoch(peerStatus.finalizedEpoch), - root: peerStatus.finalizedRoot, - }; - break; - } - - case RangeSyncType.Head: { - // The new peer has the same finalized (earlier filters should prevent a peer with an - // earlier finalized chain from reaching here). - startEpoch = Math.min(computeEpochAtSlot(localStatus.headSlot), peerStatus.finalizedEpoch); - target = { - slot: peerStatus.headSlot, - root: peerStatus.headRoot, - }; - break; - } - } + const {syncType, startEpoch, target} = getRangeSyncTarget(localStatus, peerStatus, this.chain.forkChoice); + this.logger.debug("Sync peer joined", { + peer: peerId.toB58String(), + syncType, + startEpoch, + targetSlot: target.slot, + targetRoot: toHex(target.root), + }); // If the peer existed in any other chain, remove it. // re-status'd peers can exist in multiple finalized chains, only one sync at a time @@ -247,12 +229,17 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) { reportPeer: this.reportPeer, onEnd: this.onSyncChainEnd, }, - {config: this.config, logger: this.logger}, - this.opts + {config: this.config, logger: this.logger} ); this.chains.set(syncType, syncChain); - this.logger.verbose("Added syncChain", {syncType}); + this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "add"}); + this.logger.debug("SyncChain added", { + syncType, + firstEpoch: syncChain.firstBatchEpoch, + targetSlot: syncChain.target.slot, + targetRoot: toHex(syncChain.target.root), + }); } syncChain.addPeer(peer, target); @@ -275,8 +262,17 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) { ) { syncChain.remove(); this.chains.delete(id); - this.logger.debug("Removed syncChain", {id: syncChain.logId}); + this.metrics?.syncRange.syncChainsEvents.inc({syncType: syncChain.syncType, event: "remove"}); + this.logger.debug("SyncChain removed", { + id: syncChain.logId, + localFinalizedSlot, + lastValidatedSlot: syncChain.lastValidatedSlot, + firstEpoch: syncChain.firstBatchEpoch, + targetSlot: syncChain.target.slot, + targetRoot: toHex(syncChain.target.root), + validatedEpochs: syncChain.validatedEpochs, + }); // Re-status peers from successful chain. Potentially trigger a Head sync this.network.reStatusPeers(syncChain.getPeers()); diff --git a/packages/beacon-node/src/sync/range/utils/batches.ts b/packages/beacon-node/src/sync/range/utils/batches.ts index ef73cc63b52..734c84800b6 100644 --- a/packages/beacon-node/src/sync/range/utils/batches.ts +++ b/packages/beacon-node/src/sync/range/utils/batches.ts @@ -1,5 +1,8 @@ -import {Epoch} from "@lodestar/types"; -import {Batch, BatchOpts, BatchStatus} from "../batch.js"; +import {SLOTS_PER_EPOCH} from "@lodestar/params"; +import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; +import {Epoch, Slot} from "@lodestar/types"; +import {BATCH_SLOT_OFFSET, EPOCHS_PER_BATCH} from "../../constants.js"; +import {Batch, BatchStatus} from "../batch.js"; /** * Validates that the status and ordering of batches is valid @@ -61,24 +64,56 @@ export function getNextBatchToProcess(batches: Batch[]): Batch | null { return null; } -/** - * Compute the startEpoch of the next batch to be processed - */ -export function toBeProcessedStartEpoch(batches: Batch[], startEpoch: Epoch, opts: BatchOpts): Epoch { - const lastAwaitingValidation = batches - .reverse() - .find((batch) => batch.state.status === BatchStatus.AwaitingValidation); - return lastAwaitingValidation ? lastAwaitingValidation.startEpoch + opts.epochsPerBatch : startEpoch; -} - /** * Compute the startEpoch of the next batch to be downloaded */ -export function toBeDownloadedStartEpoch(batches: Batch[], startEpoch: Epoch, opts: BatchOpts): Epoch { +export function toBeDownloadedStartEpoch(batches: Batch[], startEpoch: Epoch): Epoch { + // Note: batches are inserted in ascending `startEpoch` order const lastBatch = batches[batches.length - 1] as undefined | Batch; - return lastBatch ? lastBatch.startEpoch + opts.epochsPerBatch : startEpoch; + return lastBatch ? lastBatch.startEpoch + EPOCHS_PER_BATCH : startEpoch; } export function toArr(map: Map): V[] { return Array.from(map.values()); } + +export function getBatchSlotRange(startEpoch: Epoch): {startSlot: number; count: number} { + return { + startSlot: computeStartSlotAtEpoch(startEpoch) + BATCH_SLOT_OFFSET, + count: EPOCHS_PER_BATCH * SLOTS_PER_EPOCH, + }; +} + +/** + * Given a batch's startEpoch, return true if batch does not include slot and is strictly after + * ``` + * Batch1 Batch2 Batch3 + * ----|--------|-----X--|--------|--- + * ``` + * - Batch1 = not includes and before = false + * - Batch2 = includes = false + * - Batch3 = not includes and after = true + */ +export function batchStartEpochIsAfterSlot(startEpoch: Epoch, targetSlot: Slot): boolean { + // The range of slots (inclusive) downloaded by a batch + const {startSlot} = getBatchSlotRange(startEpoch); + + return startSlot > targetSlot; +} + +/** + * Returns true if SyncChain has processed all possible blocks with slot <= target.slot + */ +export function isSyncChainDone(batches: Batch[], lastEpochWithProcessBlocks: Epoch, targetSlot: Slot): boolean { + // In case of full epochs of skipped slots, lastEpochWithProcessBlocks won't be updated. + // In that case it is assumed that the batches are valid only to be able to mark this SyncChain as done + const lastAwaitingValidation = batches + .reverse() + .find((batch) => batch.state.status === BatchStatus.AwaitingValidation); + + if (lastAwaitingValidation) { + return batchStartEpochIsAfterSlot(lastAwaitingValidation.startEpoch + EPOCHS_PER_BATCH, targetSlot); + } else { + return batchStartEpochIsAfterSlot(lastEpochWithProcessBlocks, targetSlot); + } +} diff --git a/packages/beacon-node/src/sync/utils/remoteSyncType.ts b/packages/beacon-node/src/sync/utils/remoteSyncType.ts index a250d9bd835..87cca86375f 100644 --- a/packages/beacon-node/src/sync/utils/remoteSyncType.ts +++ b/packages/beacon-node/src/sync/utils/remoteSyncType.ts @@ -1,5 +1,7 @@ import {IForkChoice} from "@lodestar/fork-choice"; -import {phase0} from "@lodestar/types"; +import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition"; +import {phase0, Slot} from "@lodestar/types"; +import {ChainTarget} from "../range/utils/index.js"; /** The type of peer relative to our current state */ export enum PeerSyncType { @@ -14,6 +16,10 @@ export enum PeerSyncType { // Cache Object.keys iteration for faster loops in metrics export const peerSyncTypes = Object.keys(PeerSyncType) as PeerSyncType[]; +function withinRangeOf(value: number, target: number, range: number): boolean { + return value >= target - range && value <= target + range; +} + export function getPeerSyncType( local: phase0.Status, remote: phase0.Status, @@ -41,19 +47,25 @@ export function getPeerSyncType( // // We keep these peers to allow them to sync from us. return PeerSyncType.Behind; - } else if (remote.finalizedEpoch > local.finalizedEpoch) { + } + + // + else if (remote.finalizedEpoch > local.finalizedEpoch) { if ( + // Peer is in next epoch, and head is within range => SYNCED (local.finalizedEpoch + 1 == remote.finalizedEpoch && - nearRangeStart <= remote.headSlot && - remote.headSlot <= nearRangeEnd) || + withinRangeOf(remote.headSlot, local.headSlot, slotImportTolerance)) || + // Peer's head is known => SYNCED forkChoice.hasBlock(remote.headRoot) ) { - // This peer is near enough to be considered synced, or we have already synced up to its head return PeerSyncType.FullySynced; } else { return PeerSyncType.Advanced; } - } else { + } + + // remote.finalizedEpoch == local.finalizedEpoch + else { // NOTE: if a peer has our same `finalizedEpoch` with a different `finalized_root` // they are not considered relevant and won't be propagated to sync. // Check if the peer is the peer is inside the tolerance range to be considered synced. @@ -91,3 +103,47 @@ export function getRangeSyncType(local: phase0.Status, remote: phase0.Status, fo return RangeSyncType.Head; } } + +export function getRangeSyncTarget( + local: phase0.Status, + remote: phase0.Status, + forkChoice: IForkChoice +): {syncType: RangeSyncType; startEpoch: Slot; target: ChainTarget} { + if (remote.finalizedEpoch > local.finalizedEpoch && !forkChoice.hasBlock(remote.finalizedRoot)) { + return { + // If RangeSyncType.Finalized, the range of blocks fetchable from startEpoch and target must allow to switch + // to RangeSyncType.Head + // + // finalizedRoot is a block with slot <= computeStartSlotAtEpoch(finalizedEpoch). + // If finalizedEpoch does not start with a skipped slot, the SyncChain with this target MUST process the + // first block of the next epoch in order to trigger the condition above `forkChoice.hasBlock(remote.finalizedRoot)` + // and do a Head sync. + // + // When doing a finalized sync, we'll process blocks up to the finalized checkpoint, which does not allow to + // finalize that checkpoint. Instead, our head will be the finalized checkpoint and our finalized checkpoint will + // be some older checkpoint. After completing a finalized SyncChain: + // + // (== finalized, -- non-finalized) + // Remote ====================================================|----------------| + // Local =====================================|--------------| + + syncType: RangeSyncType.Finalized, + startEpoch: local.finalizedEpoch, + target: { + slot: computeStartSlotAtEpoch(remote.finalizedEpoch), + root: remote.finalizedRoot, + }, + }; + } else { + return { + syncType: RangeSyncType.Head, + // The new peer has the same finalized (earlier filters should prevent a peer with an + // earlier finalized chain from reaching here). + startEpoch: Math.min(computeEpochAtSlot(local.headSlot), remote.finalizedEpoch), + target: { + slot: remote.headSlot, + root: remote.headRoot, + }, + }; + } +} diff --git a/packages/beacon-node/test/unit/sync/range/batch.test.ts b/packages/beacon-node/test/unit/sync/range/batch.test.ts index e70932fd615..ef76ad31852 100644 --- a/packages/beacon-node/test/unit/sync/range/batch.test.ts +++ b/packages/beacon-node/test/unit/sync/range/batch.test.ts @@ -4,27 +4,26 @@ import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {config} from "@lodestar/config/default"; import {generateEmptySignedBlock} from "../../../utils/block.js"; import {expectThrowsLodestarError} from "../../../utils/errors.js"; -import {Batch, BatchOpts, BatchStatus, BatchErrorCode, BatchError} from "../../../../src/sync/range/batch.js"; +import {Batch, BatchStatus, BatchErrorCode, BatchError} from "../../../../src/sync/range/batch.js"; +import {EPOCHS_PER_BATCH} from "../../../../src/sync/constants.js"; describe("sync / range / batch", () => { - const opts: BatchOpts = {epochsPerBatch: 2}; - // Common mock data const startEpoch = 0; const peer = new PeerId(Buffer.from("lodestar")); const blocksDownloaded = [generateEmptySignedBlock()]; it("Should return correct blockByRangeRequest", () => { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); expect(batch.request).to.deep.equal({ - startSlot: 1, - count: SLOTS_PER_EPOCH * opts.epochsPerBatch, + startSlot: 0, + count: SLOTS_PER_EPOCH * EPOCHS_PER_BATCH, step: 1, }); }); it("Complete state flow", () => { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); // Instantion: AwaitingDownload expect(batch.state.status).to.equal(BatchStatus.AwaitingDownload, "Wrong status on instantiation"); @@ -75,7 +74,7 @@ describe("sync / range / batch", () => { }); it("Should throw on inconsistent state - downloadingSuccess", () => { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); expectThrowsLodestarError( () => batch.downloadingSuccess(blocksDownloaded), @@ -89,7 +88,7 @@ describe("sync / range / batch", () => { }); it("Should throw on inconsistent state - startProcessing", () => { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); expectThrowsLodestarError( () => batch.startProcessing(), @@ -103,7 +102,7 @@ describe("sync / range / batch", () => { }); it("Should throw on inconsistent state - processingSuccess", () => { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); expectThrowsLodestarError( () => batch.processingSuccess(), diff --git a/packages/beacon-node/test/unit/sync/range/chain.test.ts b/packages/beacon-node/test/unit/sync/range/chain.test.ts index e9e4b1b4dda..74d3f37bd5d 100644 --- a/packages/beacon-node/test/unit/sync/range/chain.test.ts +++ b/packages/beacon-node/test/unit/sync/range/chain.test.ts @@ -1,9 +1,9 @@ import {config} from "@lodestar/config/default"; +import {ILogger} from "@lodestar/utils"; import {SLOTS_PER_EPOCH} from "@lodestar/params"; -import {Epoch, phase0, Slot} from "@lodestar/types"; +import {Epoch, phase0, Slot, ssz} from "@lodestar/types"; import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {linspace} from "../../../../src/util/numpy.js"; -import {generateEmptyBlock, generateEmptySignedBlock} from "../../../utils/block.js"; import {SyncChain, SyncChainFns, ChainTarget} from "../../../../src/sync/range/chain.js"; import {RangeSyncType} from "../../../../src/sync/utils/remoteSyncType.js"; import {ZERO_HASH} from "../../../../src/constants/index.js"; @@ -52,6 +52,7 @@ describe("sync / range / chain", () => { const logger = testLogger(); const ACCEPT_BLOCK = Buffer.alloc(96, 0); const REJECT_BLOCK = Buffer.alloc(96, 1); + const zeroBlockBody = ssz.phase0.BeaconBlockBody.defaultValue(); const interval: NodeJS.Timeout | null = null; // eslint-disable-next-line @typescript-eslint/no-empty-function @@ -81,7 +82,7 @@ describe("sync / range / chain", () => { const shouldReject = badBlocks?.has(i); if (shouldReject) badBlocks?.delete(i); blocks.push({ - message: generateEmptyBlock(), + message: generateEmptyBlock(i), signature: shouldReject ? REJECT_BLOCK : ACCEPT_BLOCK, }); } @@ -97,7 +98,7 @@ describe("sync / range / chain", () => { startEpoch, target, syncType, - {processChainSegment, downloadBeaconBlocksByRange, reportPeer, onEnd}, + logSyncChainFns(logger, {processChainSegment, downloadBeaconBlocksByRange, reportPeer, onEnd}), {config, logger} ); @@ -116,9 +117,16 @@ describe("sync / range / chain", () => { // eslint-disable-next-line @typescript-eslint/no-empty-function const processChainSegment: SyncChainFns["processChainSegment"] = async () => {}; - const downloadBeaconBlocksByRange: SyncChainFns["downloadBeaconBlocksByRange"] = async () => [ - generateEmptySignedBlock(), - ]; + const downloadBeaconBlocksByRange: SyncChainFns["downloadBeaconBlocksByRange"] = async (peer, request) => { + const blocks: phase0.SignedBeaconBlock[] = []; + for (let i = request.startSlot; i < request.startSlot + request.count; i += request.step) { + blocks.push({ + message: generateEmptyBlock(i), + signature: ACCEPT_BLOCK, + }); + } + return blocks; + }; const target: ChainTarget = {slot: computeStartSlotAtEpoch(targetEpoch), root: ZERO_HASH}; const syncType = RangeSyncType.Finalized; @@ -129,7 +137,7 @@ describe("sync / range / chain", () => { startEpoch, target, syncType, - {processChainSegment, downloadBeaconBlocksByRange, reportPeer, onEnd}, + logSyncChainFns(logger, {processChainSegment, downloadBeaconBlocksByRange, reportPeer, onEnd}), {config, logger} ); @@ -141,4 +149,35 @@ describe("sync / range / chain", () => { initialSync.startSyncing(startEpoch); }); }); + + function generateEmptyBlock(slot: Slot): phase0.BeaconBlock { + return { + slot, + proposerIndex: 0, + parentRoot: Buffer.alloc(32), + stateRoot: ZERO_HASH, + body: zeroBlockBody, + }; + } }); + +function logSyncChainFns(logger: ILogger, fns: SyncChainFns): SyncChainFns { + return { + processChainSegment(blocks, syncType) { + logger.debug("mock processChainSegment", {blocks: blocks.map((b) => b.message.slot).join(",")}); + return fns.processChainSegment(blocks, syncType); + }, + downloadBeaconBlocksByRange(peer, request) { + logger.debug("mock downloadBeaconBlocksByRange", request); + return fns.downloadBeaconBlocksByRange(peer, request); + }, + reportPeer(peer, action, actionName) { + logger.debug("mock reportPeer", {peer: peer.toB58String(), action, actionName}); + return fns.reportPeer(peer, action, actionName); + }, + onEnd(err, target) { + logger.debug("mock onEnd", {target: target?.slot}, err ?? undefined); + return fns.onEnd(err, target); + }, + }; +} diff --git a/packages/beacon-node/test/unit/sync/range/utils/batches.test.ts b/packages/beacon-node/test/unit/sync/range/utils/batches.test.ts index a8f9e23b58f..2f2c2201030 100644 --- a/packages/beacon-node/test/unit/sync/range/utils/batches.test.ts +++ b/packages/beacon-node/test/unit/sync/range/utils/batches.test.ts @@ -1,17 +1,17 @@ import {expect} from "chai"; import {config} from "@lodestar/config/default"; -import {Epoch} from "@lodestar/types"; +import {Epoch, Slot} from "@lodestar/types"; +import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {getValidPeerId} from "../../../../utils/peer.js"; -import {Batch, BatchOpts, BatchStatus} from "../../../../../src/sync/range/batch.js"; +import {Batch, BatchStatus} from "../../../../../src/sync/range/batch.js"; import { validateBatchesStatus, getNextBatchToProcess, - toBeProcessedStartEpoch, + isSyncChainDone, toBeDownloadedStartEpoch, } from "../../../../../src/sync/range/utils/batches.js"; describe("sync / range / batches", () => { - const opts: BatchOpts = {epochsPerBatch: 2}; const peer = getValidPeerId(); describe("validateBatchesStatus", () => { @@ -131,45 +131,56 @@ describe("sync / range / batches", () => { } }); - describe("toBeProcessedStartEpoch", () => { + describe("isSyncChainDone", () => { const testCases: { id: string; batches: [Epoch, BatchStatus][]; - startEpoch: Epoch; - result: Epoch; + latestValidatedEpoch: Epoch; + targetSlot: Slot; + isDone: boolean; }[] = [ { - id: "Return next batch after AwaitingValidation-s", + id: "Latest AwaitingValidation is beyond target", batches: [ [0, BatchStatus.AwaitingValidation], - [2, BatchStatus.AwaitingValidation], - [4, BatchStatus.Processing], - [6, BatchStatus.AwaitingDownload], + [1, BatchStatus.AwaitingValidation], + [2, BatchStatus.Processing], + [3, BatchStatus.AwaitingDownload], ], - startEpoch: 0, - result: 4, + latestValidatedEpoch: 0, + targetSlot: 1 * SLOTS_PER_EPOCH, // = Last AwaitingValidation + isDone: true, }, { - id: "No AwaitingValidation, next to process is first batch", + id: "latestValidatedEpoch is beyond target", batches: [ [4, BatchStatus.Processing], - [6, BatchStatus.AwaitingDownload], + [5, BatchStatus.AwaitingDownload], ], - startEpoch: 4, - result: 4, + latestValidatedEpoch: 3, + targetSlot: 2 * SLOTS_PER_EPOCH, // Previous to + isDone: true, }, { - id: "Empty, return startEpoch", + id: "No batches not done", batches: [], - startEpoch: 0, - result: 0, + latestValidatedEpoch: 0, + targetSlot: 1 * SLOTS_PER_EPOCH, + isDone: false, + }, + { + id: "Zero case, not done", + batches: [], + latestValidatedEpoch: 0, + targetSlot: 0 * SLOTS_PER_EPOCH, + isDone: false, }, ]; - for (const {id, batches, startEpoch, result} of testCases) { + for (const {id, batches, latestValidatedEpoch, targetSlot, isDone} of testCases) { it(id, () => { const _batches = batches.map(([batchStartEpoch, batchStatus]) => createBatch(batchStatus, batchStartEpoch)); - expect(toBeProcessedStartEpoch(_batches, startEpoch, opts)).to.equal(result); + expect(isSyncChainDone(_batches, latestValidatedEpoch, targetSlot)).to.equal(isDone); }); } }); @@ -185,12 +196,12 @@ describe("sync / range / batches", () => { id: "Regular case, pick the next available spot", batches: [ [0, BatchStatus.AwaitingValidation], - [2, BatchStatus.AwaitingValidation], - [4, BatchStatus.Processing], - [6, BatchStatus.AwaitingDownload], + [1, BatchStatus.AwaitingValidation], + [2, BatchStatus.Processing], + [3, BatchStatus.AwaitingDownload], ], startEpoch: 0, - result: 8, + result: 4, }, { id: "Empty, return startEpoch", @@ -203,13 +214,13 @@ describe("sync / range / batches", () => { for (const {id, batches, startEpoch, result} of testCases) { it(id, () => { const _batches = batches.map(([batchStartEpoch, batchStatus]) => createBatch(batchStatus, batchStartEpoch)); - expect(toBeDownloadedStartEpoch(_batches, startEpoch, opts)).to.equal(result); + expect(toBeDownloadedStartEpoch(_batches, startEpoch)).to.equal(result); }); } }); function createBatch(status: BatchStatus, startEpoch = 0): Batch { - const batch = new Batch(startEpoch, config, opts); + const batch = new Batch(startEpoch, config); if (status === BatchStatus.AwaitingDownload) return batch; diff --git a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts index 916309aa664..b8a0a9921a0 100644 --- a/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts +++ b/packages/beacon-node/test/unit/sync/range/utils/peerBalancer.test.ts @@ -1,20 +1,18 @@ import {expect} from "chai"; import PeerId from "peer-id"; import {config} from "@lodestar/config/default"; -import {Batch, BatchOpts} from "../../../../../src/sync/range/batch.js"; +import {Batch} from "../../../../../src/sync/range/batch.js"; import {ChainPeersBalancer} from "../../../../../src/sync/range/utils/peerBalancer.js"; describe("sync / range / peerBalancer", () => { - const opts: BatchOpts = {epochsPerBatch: 1}; - it("bestPeerToRetryBatch", () => { // Run N times to make sure results are consistent with different shufflings for (let i = 0; i < 5; i++) { const peer1 = new PeerId(Buffer.from([0])); // Offset by one, PeerId encodes to B58String 0 as "1" const peer2 = new PeerId(Buffer.from([1])); const peer3 = new PeerId(Buffer.from([2])); - const batch0 = new Batch(0, config, opts); - const batch1 = new Batch(1, config, opts); + const batch0 = new Batch(0, config); + const batch1 = new Batch(1, config); // Batch zero has a failedDownloadAttempt with peer0 batch0.startDownloading(peer1); @@ -46,8 +44,8 @@ describe("sync / range / peerBalancer", () => { const peer2 = new PeerId(Buffer.from([1])); const peer3 = new PeerId(Buffer.from([2])); const peer4 = new PeerId(Buffer.from([3])); - const batch0 = new Batch(0, config, opts); - const batch1 = new Batch(1, config, opts); + const batch0 = new Batch(0, config); + const batch1 = new Batch(1, config); // peer1 and peer2 are busy downloading batch0.startDownloading(peer1);