diff --git a/packages/api/src/beacon/routes/lodestar.ts b/packages/api/src/beacon/routes/lodestar.ts index 44771b43217..85c79d82593 100644 --- a/packages/api/src/beacon/routes/lodestar.ts +++ b/packages/api/src/beacon/routes/lodestar.ts @@ -81,7 +81,7 @@ export type Api = { /** TODO: description */ getSyncChainsDebugState(): Promise>; /** Dump all items in a gossip queue, by gossipType */ - getGossipQueueItems(gossipType: string): Promise>; + getGossipQueueItems(gossipType: string): Promise>; /** Dump all items in the regen queue */ getRegenQueueItems(): Promise>; /** Dump all items in the block processor queue */ diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 86336b90d40..6900b12c169 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -9,7 +9,7 @@ import {validateBlsToExecutionChange} from "../../../../chain/validation/blsToEx import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee.js"; import {ApiModules} from "../../types.js"; import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors/index.js"; -import {validateGossipFnRetryUnknownRoot} from "../../../../network/gossip/handlers/index.js"; +import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js"; export function getBeaconPoolApi({ chain, diff --git a/packages/beacon-node/src/api/impl/lodestar/index.ts b/packages/beacon-node/src/api/impl/lodestar/index.ts index d779f7d18cd..7e69053a584 100644 --- a/packages/beacon-node/src/api/impl/lodestar/index.ts +++ b/packages/beacon-node/src/api/impl/lodestar/index.ts @@ -60,7 +60,7 @@ export function getLodestarApi({ async getGossipQueueItems(gossipType: GossipType | string) { return { - data: await network.dumpGossipQueueItems(gossipType), + data: await network.dumpGossipQueue(gossipType as GossipType), }; }, diff --git a/packages/beacon-node/src/api/impl/validator/index.ts b/packages/beacon-node/src/api/impl/validator/index.ts index 3a820a8b16f..31ada98a3bd 100644 --- a/packages/beacon-node/src/api/impl/validator/index.ts +++ b/packages/beacon-node/src/api/impl/validator/index.ts @@ -25,7 +25,7 @@ import {CommitteeSubscription} from "../../../network/subnets/index.js"; import {ApiModules} from "../types.js"; import {RegenCaller} from "../../../chain/regen/index.js"; import {getValidatorStatus} from "../beacon/state/utils.js"; -import {validateGossipFnRetryUnknownRoot} from "../../../network/gossip/handlers/index.js"; +import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js"; import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js"; /** diff --git a/packages/beacon-node/src/chain/bls/interface.ts b/packages/beacon-node/src/chain/bls/interface.ts index 4ce95a675d0..2abeca62e10 100644 --- a/packages/beacon-node/src/chain/bls/interface.ts +++ b/packages/beacon-node/src/chain/bls/interface.ts @@ -43,4 +43,9 @@ export interface IBlsVerifier { /** For multithread pool awaits terminating all workers */ close(): Promise; + + /** + * Returns true if BLS worker pool is ready to accept more work jobs. + */ + canAcceptWork(): boolean; } diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 484c29accf0..ac28cd2dd24 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -56,6 +56,11 @@ const MAX_BUFFERED_SIGS = 32; */ const MAX_BUFFER_WAIT_MS = 100; +/** + * Max concurrent jobs on `canAcceptWork` status + */ +const MAX_JOBS_CAN_ACCEPT_WORK = 512; + type WorkerApi = { verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise; }; @@ -110,6 +115,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } | null = null; private blsVerifyAllMultiThread: boolean; private closed = false; + private workersBusy = 0; constructor(options: BlsMultiThreadWorkerPoolOptions, modules: BlsMultiThreadWorkerPoolModules) { const {logger, metrics} = modules; @@ -127,10 +133,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { this.workers = this.createWorkers(implementation, defaultPoolSize); if (metrics) { - metrics.blsThreadPool.queueLength.addCollect(() => metrics.blsThreadPool.queueLength.set(this.jobs.length)); + metrics.blsThreadPool.queueLength.addCollect(() => { + metrics.blsThreadPool.queueLength.set(this.jobs.length); + metrics.blsThreadPool.workersBusy.set(this.workersBusy); + }); } } + canAcceptWork(): boolean { + return ( + this.workersBusy < defaultPoolSize && + // TODO: Should also bound the jobs queue? + this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK + ); + } + async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise { // Pubkeys are aggregated in the main thread regardless if verified in workers or in main thread this.metrics?.bls.aggregatedPubkeys.inc(getAggregatedPubkeysCount(sets)); @@ -310,6 +327,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { const workerApi = worker.status.workerApi; worker.status = {code: WorkerStatusCode.running, workerApi}; + this.workersBusy++; try { let startedSigSets = 0; @@ -375,6 +393,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } worker.status = {code: WorkerStatusCode.idle, workerApi}; + this.workersBusy--; // Potentially run a new job setTimeout(this.runJob, 0); diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index 6895e222569..78f3f4bf520 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -37,4 +37,9 @@ export class BlsSingleThreadVerifier implements IBlsVerifier { async close(): Promise { // nothing to do } + + canAcceptWork(): boolean { + // Since sigs are verified blocking the main thread, there's no mechanism to throttle + return true; + } } diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 99ca83af5e0..599f2d85bc9 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -34,7 +34,7 @@ import {BeaconClock, LocalClock} from "./clock/index.js"; import {ChainEventEmitter, ChainEvent} from "./emitter.js"; import {IBeaconChain, ProposerPreparationData} from "./interface.js"; import {IChainOptions} from "./options.js"; -import {IStateRegenerator, QueuedStateRegenerator, RegenCaller} from "./regen/index.js"; +import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js"; import {initializeForkChoice} from "./forkChoice/index.js"; import {computeAnchorCheckpoint} from "./initState.js"; import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js"; @@ -91,7 +91,7 @@ export class BeaconChain implements IBeaconChain { readonly emitter: ChainEventEmitter; readonly stateCache: StateContextCache; readonly checkpointStateCache: CheckpointStateCache; - readonly regen: IStateRegenerator; + readonly regen: QueuedStateRegenerator; readonly lightClientServer: LightClientServer; readonly reprocessController: ReprocessController; @@ -273,6 +273,14 @@ export class BeaconChain implements IBeaconChain { await this.bls.close(); } + regenCanAcceptWork(): boolean { + return this.regen.canAcceptWork(); + } + + blsThreadPoolCanAcceptWork(): boolean { + return this.bls.canAcceptWork(); + } + validatorSeenAtEpoch(index: ValidatorIndex, epoch: Epoch): boolean { // Caller must check that epoch is not older that current epoch - 1 // else the caches for that epoch may already be pruned. diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index d087d2b2a3c..aac24fe76ae 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -133,6 +133,9 @@ export interface IBeaconChain { /** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */ persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; + + regenCanAcceptWork(): boolean; + blsThreadPoolCanAcceptWork(): boolean; } export type SSZObjectType = diff --git a/packages/beacon-node/src/chain/regen/queued.ts b/packages/beacon-node/src/chain/regen/queued.ts index 16e2afaa33e..b4ee806cfba 100644 --- a/packages/beacon-node/src/chain/regen/queued.ts +++ b/packages/beacon-node/src/chain/regen/queued.ts @@ -10,6 +10,8 @@ import {StateRegenerator, RegenModules} from "./regen.js"; import {RegenError, RegenErrorCode} from "./errors.js"; const REGEN_QUEUE_MAX_LEN = 256; +// TODO: Should this constant be lower than above? 256 feels high +const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16; type QueuedStateRegeneratorModules = RegenModules & { signal: AbortSignal; @@ -46,6 +48,10 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.metrics = modules.metrics; } + canAcceptWork(): boolean { + return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD; + } + /** * Get the state to run with `block`. * - State after `block.parentRoot` dialed forward to block.slot diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 61243d9a6ac..7aa6ffd20b5 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -267,10 +267,26 @@ export function createLodestarMetrics( }), gossipValidationQueueConcurrency: register.gauge<"topic">({ name: "lodestar_gossip_validation_queue_concurrency", - help: "Current concurrency of gossip validation queue", + help: "Current count of jobs being run on network processor for topic", labelNames: ["topic"], }), + networkProcessor: { + executeWorkCalls: register.gauge({ + name: "lodestar_network_processor_execute_work_calls_total", + help: "Total calls to network processor execute work fn", + }), + jobsSubmitted: register.histogram({ + name: "lodestar_network_processor_execute_jobs_submitted_total", + help: "Total calls to network processor execute work fn", + buckets: [0, 1, 5, 128], + }), + canNotAcceptWork: register.gauge({ + name: "lodestar_network_processor_can_not_accept_work_total", + help: "Total times network processor can not accept work on executeWork", + }), + }, + discv5: { decodeEnrAttemptCount: register.counter({ name: "lodestar_discv5_decode_enr_attempt_count", @@ -538,6 +554,10 @@ export function createLodestarMetrics( name: "lodestar_bls_thread_pool_queue_length", help: "Count of total block processor queue length", }), + workersBusy: register.gauge({ + name: "lodestar_bls_thread_pool_workers_busy", + help: "Count of current busy workers", + }), totalJobsGroupsStarted: register.gauge({ name: "lodestar_bls_thread_pool_job_groups_started_total", help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs", diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index 2973ca60a4e..2111afaa4c0 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -1,9 +1,11 @@ import {EventEmitter} from "events"; import {PeerId} from "@libp2p/interface-peer-id"; import StrictEventEmitter from "strict-event-emitter-types"; +import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {phase0} from "@lodestar/types"; import {BlockInput} from "../chain/blocks/types.js"; import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js"; +import {PendingGossipsubMessage} from "./processor/types.js"; export enum NetworkEvent { /** A relevant peer has connected or has been re-STATUS'd */ @@ -14,6 +16,10 @@ export enum NetworkEvent { gossipHeartbeat = "gossipsub.heartbeat", reqRespRequest = "req-resp.request", unknownBlockParent = "unknownBlockParent", + + // Network processor events + pendingGossipsubMessage = "gossip.pendingGossipsubMessage", + gossipMessageValidationResult = "gossip.messageValidationResult", } export type NetworkEvents = { @@ -21,6 +27,12 @@ export type NetworkEvents = { [NetworkEvent.peerDisconnected]: (peer: PeerId) => void; [NetworkEvent.reqRespRequest]: (request: RequestTypedContainer, peer: PeerId) => void; [NetworkEvent.unknownBlockParent]: (blockInput: BlockInput, peerIdStr: string) => void; + [NetworkEvent.pendingGossipsubMessage]: (data: PendingGossipsubMessage) => void; + [NetworkEvent.gossipMessageValidationResult]: ( + msgId: string, + propagationSource: PeerId, + acceptance: TopicValidatorResult + ) => void; }; export type INetworkEventBus = StrictEventEmitter; diff --git a/packages/beacon-node/src/network/gossip/gossipsub.ts b/packages/beacon-node/src/network/gossip/gossipsub.ts index d2100b56a54..8625e45a919 100644 --- a/packages/beacon-node/src/network/gossip/gossipsub.ts +++ b/packages/beacon-node/src/network/gossip/gossipsub.ts @@ -1,3 +1,5 @@ +import {PeerId} from "@libp2p/interface-peer-id"; +import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {GossipSub, GossipsubEvents} from "@chainsafe/libp2p-gossipsub"; import {PublishOpts, SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types"; import {PeerScore, PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score"; @@ -14,19 +16,10 @@ import {PeersData} from "../peers/peersData.js"; import {ClientKind} from "../peers/client.js"; import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js"; import {Libp2p} from "../interface.js"; -import { - GossipJobQueues, - GossipTopic, - GossipTopicMap, - GossipType, - GossipTypeMap, - ValidatorFnsByType, - GossipHandlers, - GossipBeaconNode, -} from "./interface.js"; +import {NetworkEvent, NetworkEventBus} from "../events.js"; +import {GossipBeaconNode, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap} from "./interface.js"; import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js"; import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js"; -import {createValidatorFnsByType} from "./validation/index.js"; import { computeGossipPeerScoreParams, @@ -47,10 +40,9 @@ export type Eth2GossipsubModules = { libp2p: Libp2p; logger: Logger; metrics: Metrics | null; - signal: AbortSignal; eth2Context: Eth2Context; - gossipHandlers: GossipHandlers; peersData: PeersData; + events: NetworkEventBus; }; export type Eth2GossipsubOpts = { @@ -59,6 +51,7 @@ export type Eth2GossipsubOpts = { gossipsubDLow?: number; gossipsubDHigh?: number; gossipsubAwaitHandler?: boolean; + skipParamsLog?: boolean; }; /** @@ -75,23 +68,21 @@ export type Eth2GossipsubOpts = { * See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub */ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { - readonly jobQueues: GossipJobQueues; readonly scoreParams: Partial; private readonly config: BeaconConfig; private readonly logger: Logger; private readonly peersData: PeersData; + private readonly events: NetworkEventBus; // Internal caches private readonly gossipTopicCache: GossipTopicCache; - private readonly validatorFnsByType: ValidatorFnsByType; - constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) { const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts; const gossipTopicCache = new GossipTopicCache(modules.config); const scoreParams = computeGossipPeerScoreParams(modules); - const {config, logger, metrics, signal, gossipHandlers, peersData} = modules; + const {config, logger, metrics, peersData, events} = modules; // Gossipsub parameters defined here: // https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub @@ -136,29 +127,21 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { this.config = config; this.logger = logger; this.peersData = peersData; + this.events = events; this.gossipTopicCache = gossipTopicCache; - // Note: We use the validator functions as handlers. No handler will be registered to gossipsub. - // libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone. - // TODO: Force to ensure there's a validatorFunction attached to every received topic. - const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, { - config, - logger, - metrics, - signal, - }); - this.validatorFnsByType = validatorFnsByType; - this.jobQueues = jobQueues; - if (metrics) { metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeLodestarMetrics(metrics)); } this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this)); + this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this)); // Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted. // Scoring issues require this dump + current peer score stats to re-calculate scores. - this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); + if (!opts.skipParamsLog) { + this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); + } } /** @@ -413,14 +396,19 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { // Get seenTimestamp before adding the message to the queue or add async delays const seenTimestampSec = Date.now() / 1000; - // Puts object in queue, validates, then processes - this.validatorFnsByType[topic.type](topic, msg, propagationSource.toString(), seenTimestampSec) - .then((acceptance) => { - this.reportMessageValidationResult(msgId, propagationSource, acceptance); - }) - .catch((e) => { - this.logger.error("Error onGossipsubMessage", {}, e); - }); + // Emit message to network processor + this.events.emit(NetworkEvent.pendingGossipsubMessage, { + topic, + msg, + msgId, + propagationSource, + seenTimestampSec, + startProcessUnixSec: null, + }); + } + + private onValidationResult(msgId: string, propagationSource: PeerId, acceptance: TopicValidatorResult): void { + this.reportMessageValidationResult(msgId, propagationSource, acceptance); } } diff --git a/packages/beacon-node/src/network/gossip/index.ts b/packages/beacon-node/src/network/gossip/index.ts index 07d3d1310a3..e76bc927944 100644 --- a/packages/beacon-node/src/network/gossip/index.ts +++ b/packages/beacon-node/src/network/gossip/index.ts @@ -1,4 +1,4 @@ export {Eth2Gossipsub} from "./gossipsub.js"; -export {getGossipHandlers} from "./handlers/index.js"; +export {getGossipHandlers} from "../processor/gossipHandlers.js"; export {getCoreTopicsAtFork} from "./topic.js"; export * from "./interface.js"; diff --git a/packages/beacon-node/src/network/gossip/validation/onAccept.ts b/packages/beacon-node/src/network/gossip/validation/onAccept.ts deleted file mode 100644 index 810e7ccfd64..00000000000 --- a/packages/beacon-node/src/network/gossip/validation/onAccept.ts +++ /dev/null @@ -1,15 +0,0 @@ -import {ChainForkConfig} from "@lodestar/config"; -import {GossipType, GossipTypeMap, GossipTopicTypeMap} from "../interface.js"; - -export type GetGossipAcceptMetadataFn = ( - config: ChainForkConfig, - object: GossipTypeMap[GossipType], - topic: GossipTopicTypeMap[GossipType] -) => Record; -export type GetGossipAcceptMetadataFns = { - [K in GossipType]: ( - config: ChainForkConfig, - object: GossipTypeMap[K], - topic: GossipTopicTypeMap[K] - ) => Record; -}; diff --git a/packages/beacon-node/src/network/interface.ts b/packages/beacon-node/src/network/interface.ts index 2711eef83d4..93ca0cec65c 100644 --- a/packages/beacon-node/src/network/interface.ts +++ b/packages/beacon-node/src/network/interface.ts @@ -10,10 +10,11 @@ import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score"; import {routes} from "@lodestar/api"; import {BlockInput} from "../chain/blocks/types.js"; import {INetworkEventBus} from "./events.js"; -import {GossipBeaconNode} from "./gossip/index.js"; +import {GossipBeaconNode, GossipType} from "./gossip/index.js"; import {PeerAction, PeerScoreStats} from "./peers/index.js"; import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js"; import {CommitteeSubscription} from "./subnets/index.js"; +import {PendingGossipsubMessage} from "./processor/types.js"; export type PeerSearchOptions = { supportsProtocols?: string[]; @@ -61,7 +62,7 @@ export interface INetwork { dumpPeer(peerIdStr: string): Promise; dumpPeerScoreStats(): Promise; dumpGossipPeerScoreStats(): Promise; - dumpGossipQueueItems(gossipType: string): Promise; + dumpGossipQueue(gossipType: GossipType): Promise; dumpDiscv5KadValues(): Promise; } diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index f7d8ce534c7..cef0090ae66 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -20,7 +20,6 @@ import {ReqRespBeaconNode, ReqRespHandlers, beaconBlocksMaybeBlobsByRange} from import {beaconBlocksMaybeBlobsByRoot} from "./reqresp/beaconBlocksMaybeBlobsByRoot.js"; import { Eth2Gossipsub, - getGossipHandlers, GossipHandlers, GossipTopicTypeMap, GossipType, @@ -37,6 +36,8 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; import {createNodeJsLibp2p} from "./nodejs/util.js"; +import {NetworkProcessor} from "./processor/index.js"; +import {PendingGossipsubMessage} from "./processor/types.js"; // How many changes to batch cleanup const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10; @@ -50,6 +51,7 @@ type NetworkModules = { signal: AbortSignal; peersData: PeersData; networkEventBus: NetworkEventBus; + networkProcessor: NetworkProcessor; metadata: MetadataController; peerRpcScores: PeerRpcScoreStore; reqResp: ReqRespBeaconNode; @@ -84,6 +86,7 @@ export class Network implements INetwork { private readonly opts: NetworkOptions; private readonly peersData: PeersData; + private readonly networkProcessor: NetworkProcessor; private readonly peerManager: PeerManager; private readonly libp2p: Libp2p; private readonly logger: Logger; @@ -106,6 +109,7 @@ export class Network implements INetwork { signal, peersData, networkEventBus, + networkProcessor, metadata, peerRpcScores, reqResp, @@ -123,7 +127,7 @@ export class Network implements INetwork { this.signal = signal; this.peersData = peersData; this.events = networkEventBus; - this.metadata = metadata; + (this.networkProcessor = networkProcessor), (this.metadata = metadata); this.peerRpcScores = peerRpcScores; this.reqResp = reqResp; this.gossip = gossip; @@ -146,8 +150,8 @@ export class Network implements INetwork { peerStoreDir, chain, reqRespHandlers, - gossipHandlers, signal, + gossipHandlers, }: NetworkInitModules): Promise { const clock = chain.clock; const peersData = new PeersData(); @@ -196,16 +200,13 @@ export class Network implements INetwork { libp2p, logger, metrics, - signal, - gossipHandlers: - gossipHandlers ?? - getGossipHandlers({chain, config, logger, attnetsService, peerRpcScores, networkEventBus, metrics}, opts), eth2Context: { activeValidatorCount: chain.getHeadState().epochCtx.currentShuffling.activeIndices.length, currentSlot: clock.currentSlot, currentEpoch: clock.currentEpoch, }, peersData, + events: networkEventBus, }); const syncnetsService = new SyncnetsService(config, chain, gossip, metadata, logger, metrics, opts); @@ -228,6 +229,11 @@ export class Network implements INetwork { opts ); + const networkProcessor = new NetworkProcessor( + {attnetsService, chain, config, logger, metrics, peerRpcScores, events: networkEventBus, gossipHandlers}, + opts + ); + await libp2p.start(); // Network spec decides version changes based on clock fork, not head fork @@ -260,6 +266,7 @@ export class Network implements INetwork { signal, peersData, networkEventBus, + networkProcessor, metadata, peerRpcScores, reqResp, @@ -411,9 +418,7 @@ export class Network implements INetwork { } // Drop all the gossip validation queues - for (const jobQueue of Object.values(this.gossip.jobQueues)) { - jobQueue.dropAllJobs(); - } + this.networkProcessor.dropAllJobs(); } isSubscribedToGossipCoreTopics(): boolean { @@ -445,24 +450,6 @@ export class Network implements INetwork { })); } - async dumpGossipQueueItems(gossipType: string): Promise { - const jobQueue = this.gossip.jobQueues[gossipType as GossipType]; - if (jobQueue === undefined) { - throw Error(`Unknown gossipType ${gossipType}, known values: ${Object.keys(jobQueue).join(", ")}`); - } - - return jobQueue.getItems().map((item) => { - const [topic, message, propagationSource, seenTimestampSec] = item.args; - return { - topic: topic, - propagationSource, - data: message.data, - addedTimeMs: item.addedTimeMs, - seenTimestampSec, - }; - }); - } - async dumpPeerScoreStats(): Promise { return this.peerRpcScores.dumpPeerScoreStats(); } @@ -475,6 +462,10 @@ export class Network implements INetwork { return (await this.discv5?.kadValues())?.map((enr) => enr.encodeTxt()) ?? []; } + async dumpGossipQueue(gossipType: GossipType): Promise { + return this.networkProcessor.dumpGossipQueue(gossipType); + } + /** * Handle subscriptions through fork transitions, @see FORK_EPOCH_LOOKAHEAD */ diff --git a/packages/beacon-node/src/network/options.ts b/packages/beacon-node/src/network/options.ts index e004d671597..696bb65f387 100644 --- a/packages/beacon-node/src/network/options.ts +++ b/packages/beacon-node/src/network/options.ts @@ -1,15 +1,16 @@ import {generateKeypair, IDiscv5DiscoveryInputOptions, KeypairType, SignableENR} from "@chainsafe/discv5"; import {Eth2GossipsubOpts} from "./gossip/gossipsub.js"; -import {defaultGossipHandlerOpts, GossipHandlerOpts} from "./gossip/handlers/index.js"; +import {defaultGossipHandlerOpts} from "./processor/gossipHandlers.js"; import {PeerManagerOpts} from "./peers/index.js"; import {ReqRespBeaconNodeOpts} from "./reqresp/ReqRespBeaconNode.js"; +import {NetworkProcessorOpts} from "./processor/index.js"; // Since Network is eventually intended to be run in a separate thread, ensure that all options are cloneable using structuredClone export interface NetworkOptions extends PeerManagerOpts, // remove all Functions Omit, - GossipHandlerOpts, + NetworkProcessorOpts, Eth2GossipsubOpts { localMultiaddrs: string[]; bootMultiaddrs?: string[]; diff --git a/packages/beacon-node/src/network/gossip/handlers/index.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts similarity index 93% rename from packages/beacon-node/src/network/gossip/handlers/index.ts rename to packages/beacon-node/src/network/processor/gossipHandlers.ts index 52e70cc4802..24debc783fd 100644 --- a/packages/beacon-node/src/network/gossip/handlers/index.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -4,9 +4,9 @@ import {BeaconConfig} from "@lodestar/config"; import {Logger, prettyBytes} from "@lodestar/utils"; import {phase0, Root, Slot, ssz} from "@lodestar/types"; import {ForkName, ForkSeq} from "@lodestar/params"; -import {Metrics} from "../../../metrics/index.js"; -import {OpSource} from "../../../metrics/validatorMonitor.js"; -import {IBeaconChain} from "../../../chain/index.js"; +import {Metrics} from "../../metrics/index.js"; +import {OpSource} from "../../metrics/validatorMonitor.js"; +import {IBeaconChain} from "../../chain/index.js"; import { AttestationError, AttestationErrorCode, @@ -16,8 +16,8 @@ import { GossipAction, GossipActionError, SyncCommitteeError, -} from "../../../chain/errors/index.js"; -import {GossipHandlers, GossipType} from "../interface.js"; +} from "../../chain/errors/index.js"; +import {GossipHandlers, GossipType} from "../gossip/interface.js"; import { validateGossipAggregateAndProof, validateGossipAttestation, @@ -28,14 +28,14 @@ import { validateSyncCommitteeGossipContributionAndProof, validateGossipVoluntaryExit, validateBlsToExecutionChange, -} from "../../../chain/validation/index.js"; -import {NetworkEvent, NetworkEventBus} from "../../events.js"; -import {PeerAction, PeerRpcScoreStore} from "../../peers/index.js"; -import {validateLightClientFinalityUpdate} from "../../../chain/validation/lightClientFinalityUpdate.js"; -import {validateLightClientOptimisticUpdate} from "../../../chain/validation/lightClientOptimisticUpdate.js"; -import {validateGossipBlobsSidecar} from "../../../chain/validation/blobsSidecar.js"; -import {BlockInput, getBlockInput} from "../../../chain/blocks/types.js"; -import {AttnetsService} from "../../subnets/attnetsService.js"; +} from "../../chain/validation/index.js"; +import {NetworkEvent, NetworkEventBus} from "../events.js"; +import {PeerAction, PeerRpcScoreStore} from "../peers/index.js"; +import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js"; +import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js"; +import {validateGossipBlobsSidecar} from "../../chain/validation/blobsSidecar.js"; +import {BlockInput, getBlockInput} from "../../chain/blocks/types.js"; +import {AttnetsService} from "../subnets/attnetsService.js"; /** * Gossip handler options as part of network options @@ -52,13 +52,13 @@ export const defaultGossipHandlerOpts = { dontSendGossipAttestationsToForkchoice: false, }; -type ValidatorFnsModules = { +export type ValidatorFnsModules = { attnetsService: AttnetsService; chain: IBeaconChain; config: BeaconConfig; logger: Logger; metrics: Metrics | null; - networkEventBus: NetworkEventBus; + events: NetworkEventBus; peerRpcScores: PeerRpcScoreStore; }; @@ -79,7 +79,7 @@ const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1; * - Ethereum Consensus gossipsub protocol strictly defined a single topic for message */ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers { - const {attnetsService, chain, config, metrics, networkEventBus, peerRpcScores, logger} = modules; + const {attnetsService, chain, config, metrics, events, peerRpcScores, logger} = modules; async function validateBeaconBlock( blockInput: BlockInput, @@ -109,7 +109,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH if (e instanceof BlockGossipError) { if (e instanceof BlockGossipError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { logger.debug("Gossip block has error", {slot, root: blockHex, code: e.type.code}); - networkEventBus.emit(NetworkEvent.unknownBlockParent, blockInput, peerIdStr); + events.emit(NetworkEvent.unknownBlockParent, blockInput, peerIdStr); } } diff --git a/packages/beacon-node/src/network/gossip/validation/queue.ts b/packages/beacon-node/src/network/processor/gossipQueues.ts similarity index 60% rename from packages/beacon-node/src/network/gossip/validation/queue.ts rename to packages/beacon-node/src/network/processor/gossipQueues.ts index 986a46a8450..6b7d826173f 100644 --- a/packages/beacon-node/src/network/gossip/validation/queue.ts +++ b/packages/beacon-node/src/network/processor/gossipQueues.ts @@ -1,33 +1,89 @@ import {mapValues} from "@lodestar/utils"; -import {Metrics} from "../../../metrics/index.js"; -import {JobItemQueue, JobQueueOpts, QueueType} from "../../../util/queue/index.js"; -import {GossipJobQueues, GossipType, GossipValidatorFn, ResolvedType, ValidatorFnsByType} from "../interface.js"; +import {LinkedList} from "../../util/array.js"; +import {GossipType} from "../gossip/interface.js"; + +enum QueueType { + FIFO = "FIFO", + LIFO = "LIFO", +} /** * Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69 */ const gossipQueueOpts: { - [K in GossipType]: Pick; + [K in GossipType]: GossipQueueOpts; } = { // validation gossip block asap - [GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO, noYieldIfOneItem: true}, + [GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO}, // TODO DENEB: What's a good queue max given that now blocks are much bigger? - [GossipType.beacon_block_and_blobs_sidecar]: {maxLength: 32, type: QueueType.FIFO, noYieldIfOneItem: true}, + [GossipType.beacon_block_and_blobs_sidecar]: {maxLength: 32, type: QueueType.FIFO}, // lighthoue has aggregate_queue 4096 and unknown_block_aggregate_queue 1024, we use single queue - [GossipType.beacon_aggregate_and_proof]: {maxLength: 5120, type: QueueType.LIFO, maxConcurrency: 16}, + [GossipType.beacon_aggregate_and_proof]: {maxLength: 5120, type: QueueType.LIFO}, // lighthouse has attestation_queue 16384 and unknown_block_attestation_queue 8192, we use single queue - [GossipType.beacon_attestation]: {maxLength: 24576, type: QueueType.LIFO, maxConcurrency: 64}, + [GossipType.beacon_attestation]: {maxLength: 24576, type: QueueType.LIFO}, [GossipType.voluntary_exit]: {maxLength: 4096, type: QueueType.FIFO}, [GossipType.proposer_slashing]: {maxLength: 4096, type: QueueType.FIFO}, [GossipType.attester_slashing]: {maxLength: 4096, type: QueueType.FIFO}, - [GossipType.sync_committee_contribution_and_proof]: {maxLength: 4096, type: QueueType.LIFO, maxConcurrency: 16}, - [GossipType.sync_committee]: {maxLength: 4096, type: QueueType.LIFO, maxConcurrency: 64}, + [GossipType.sync_committee_contribution_and_proof]: {maxLength: 4096, type: QueueType.LIFO}, + [GossipType.sync_committee]: {maxLength: 4096, type: QueueType.LIFO}, [GossipType.light_client_finality_update]: {maxLength: 1024, type: QueueType.FIFO}, [GossipType.light_client_optimistic_update]: {maxLength: 1024, type: QueueType.FIFO}, // check ?? MAX_BLS_TO_EXECUTION_CHANGES 2**4 (= 16) [GossipType.bls_to_execution_change]: {maxLength: 1024, type: QueueType.FIFO}, }; +type GossipQueueOpts = { + type: QueueType; + maxLength: number; +}; + +export class GossipQueue { + private readonly list = new LinkedList(); + + constructor(private readonly opts: GossipQueueOpts) {} + + get length(): number { + return this.list.length; + } + + clear(): void { + this.list.clear(); + } + + add(item: T): T | null { + let droppedItem: T | null = null; + + if (this.list.length + 1 > this.opts.maxLength) { + // LIFO -> keep latest job, drop oldest, FIFO -> drop latest job + switch (this.opts.type) { + case QueueType.LIFO: + droppedItem = this.list.shift(); + break; + case QueueType.FIFO: + return item; + } + } + + this.list.push(item); + + return droppedItem; + } + + next(): T | null { + // LIFO -> pop() remove last item, FIFO -> shift() remove first item + switch (this.opts.type) { + case QueueType.LIFO: + return this.list.pop(); + case QueueType.FIFO: + return this.list.shift(); + } + } + + getAll(): T[] { + return this.list.toArray(); + } +} + /** * Wraps a GossipValidatorFn with a queue, to limit the processing of gossip objects by type. * @@ -44,25 +100,8 @@ const gossipQueueOpts: { * By topic is too specific, so by type groups all similar objects in the same queue. All in the same won't allow * to customize different queue behaviours per object type (see `gossipQueueOpts`). */ -export function createValidationQueues( - gossipValidatorFns: ValidatorFnsByType, - signal: AbortSignal, - metrics: Metrics | null -): GossipJobQueues { - return mapValues(gossipQueueOpts, (opts, type) => { - const gossipValidatorFn = gossipValidatorFns[type]; - return new JobItemQueue, ResolvedType>( - gossipValidatorFn, - {signal, ...opts}, - metrics - ? { - length: metrics.gossipValidationQueueLength.child({topic: type}), - droppedJobs: metrics.gossipValidationQueueDroppedJobs.child({topic: type}), - jobTime: metrics.gossipValidationQueueJobTime.child({topic: type}), - jobWaitTime: metrics.gossipValidationQueueJobWaitTime.child({topic: type}), - concurrency: metrics.gossipValidationQueueConcurrency.child({topic: type}), - } - : undefined - ); +export function createGossipQueues(): {[K in GossipType]: GossipQueue} { + return mapValues(gossipQueueOpts, (opts) => { + return new GossipQueue(opts); }); } diff --git a/packages/beacon-node/src/network/gossip/validation/index.ts b/packages/beacon-node/src/network/processor/gossipValidatorFn.ts similarity index 60% rename from packages/beacon-node/src/network/gossip/validation/index.ts rename to packages/beacon-node/src/network/processor/gossipValidatorFn.ts index 4bc07d9405a..00b5b3f4a6e 100644 --- a/packages/beacon-node/src/network/gossip/validation/index.ts +++ b/packages/beacon-node/src/network/processor/gossipValidatorFn.ts @@ -1,52 +1,17 @@ import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {ChainForkConfig} from "@lodestar/config"; -import {Logger, mapValues} from "@lodestar/utils"; -import {Metrics} from "../../../metrics/index.js"; -import {getGossipSSZType} from "../topic.js"; -import { - GossipJobQueues, - GossipType, - GossipValidatorFn, - ValidatorFnsByType, - GossipHandlers, - GossipHandlerFn, -} from "../interface.js"; -import {GossipActionError, GossipAction} from "../../../chain/errors/index.js"; -import {createValidationQueues} from "./queue.js"; +import {Logger} from "@lodestar/utils"; +import {Metrics} from "../../metrics/index.js"; +import {getGossipSSZType} from "../gossip/topic.js"; +import {GossipValidatorFn, GossipHandlers, GossipHandlerFn} from "../gossip/interface.js"; +import {GossipActionError, GossipAction} from "../../chain/errors/index.js"; -type ValidatorFnModules = { +export type ValidatorFnModules = { config: ChainForkConfig; logger: Logger; metrics: Metrics | null; }; -/** - * Returns GossipValidatorFn for each GossipType, given GossipHandlerFn indexed by type. - * - * @see getGossipHandlers for reasoning on why GossipHandlerFn are used for gossip validation. - */ -export function createValidatorFnsByType( - gossipHandlers: GossipHandlers, - modules: ValidatorFnModules & {signal: AbortSignal} -): {validatorFnsByType: ValidatorFnsByType; jobQueues: GossipJobQueues} { - const gossipValidatorFns = mapValues(gossipHandlers, (gossipHandler, type) => { - return getGossipValidatorFn(gossipHandler, type, modules); - }); - - const jobQueues = createValidationQueues(gossipValidatorFns, modules.signal, modules.metrics); - - const validatorFnsByType = mapValues( - jobQueues, - (jobQueue): GossipValidatorFn => { - return async function gossipValidatorFnWithQueue(topic, gossipMsg, propagationSource, seenTimestampSec) { - return jobQueue.push(topic, gossipMsg, propagationSource, seenTimestampSec); - }; - } - ); - - return {jobQueues, validatorFnsByType}; -} - /** * Returns a GossipSub validator function from a GossipHandlerFn. GossipHandlerFn may throw GossipActionError if one * or more validation conditions from the consensus-specs#p2p-interface are not satisfied. @@ -61,14 +26,12 @@ export function createValidatorFnsByType( * * @see getGossipHandlers for reasoning on why GossipHandlerFn are used for gossip validation. */ -function getGossipValidatorFn( - gossipHandler: GossipHandlers[K], - type: K, - modules: ValidatorFnModules -): GossipValidatorFn { +export function getGossipValidatorFn(gossipHandlers: GossipHandlers, modules: ValidatorFnModules): GossipValidatorFn { const {logger, metrics} = modules; return async function gossipValidatorFn(topic, msg, propagationSource, seenTimestampSec) { + const type = topic.type; + // Define in scope above try {} to be used in catch {} if object was parsed let gossipObject; try { @@ -81,7 +44,7 @@ function getGossipValidatorFn( return TopicValidatorResult.Reject; } - await (gossipHandler as GossipHandlerFn)(gossipObject, topic, propagationSource, seenTimestampSec); + await (gossipHandlers[topic.type] as GossipHandlerFn)(gossipObject, topic, propagationSource, seenTimestampSec); metrics?.gossipValidationAccept.inc({topic: type}); diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts new file mode 100644 index 00000000000..e1f37f8da53 --- /dev/null +++ b/packages/beacon-node/src/network/processor/index.ts @@ -0,0 +1,161 @@ +import {Logger, mapValues} from "@lodestar/utils"; +import {IBeaconChain} from "../../chain/interface.js"; +import {Metrics} from "../../metrics/metrics.js"; +import {NetworkEvent, NetworkEventBus} from "../events.js"; +import {GossipType} from "../gossip/interface.js"; +import {createGossipQueues} from "./gossipQueues.js"; +import {NetworkWorker, NetworkWorkerModules} from "./worker.js"; +import {PendingGossipsubMessage} from "./types.js"; +import {ValidatorFnsModules, GossipHandlerOpts} from "./gossipHandlers.js"; + +export type NetworkProcessorModules = NetworkWorkerModules & + ValidatorFnsModules & { + chain: IBeaconChain; + events: NetworkEventBus; + logger: Logger; + metrics: Metrics | null; + }; + +export type NetworkProcessorOpts = GossipHandlerOpts & { + maxGossipTopicConcurrency?: number; +}; + +const executeGossipWorkOrderObj: Record = { + [GossipType.beacon_block]: true, + [GossipType.beacon_block_and_blobs_sidecar]: true, + [GossipType.beacon_aggregate_and_proof]: true, + [GossipType.beacon_attestation]: true, + [GossipType.voluntary_exit]: true, + [GossipType.proposer_slashing]: true, + [GossipType.attester_slashing]: true, + [GossipType.sync_committee_contribution_and_proof]: true, + [GossipType.sync_committee]: true, + [GossipType.light_client_finality_update]: true, + [GossipType.light_client_optimistic_update]: true, + [GossipType.bls_to_execution_change]: true, +}; +const executeGossipWorkOrder = Object.keys(executeGossipWorkOrderObj) as (keyof typeof executeGossipWorkOrderObj)[]; + +// TODO: Arbitrary constant, check metrics +const MAX_JOBS_SUBMITTED_PER_TICK = 128; + +/** + * Network processor handles the gossip queues and throtles processing to not overload the main thread + * - Decides when to process work and what to process + * + * What triggers execute work? + * + * - When work is submitted + * - When downstream workers become available + * + * ### PendingGossipsubMessage beacon_attestation example + * + * For attestations, processing the message includes the steps: + * 1. Pre shuffling sync validation + * 2. Retrieve shuffling: async + goes into the regen queue and can be expensive + * 3. Pre sig validation sync validation + * 4. Validate BLS signature: async + goes into workers through another manager + * + * The gossip queues should receive "backpressue" from the regen and BLS workers queues. + * Such that enough work is processed to fill either one of the queue. + */ +export class NetworkProcessor { + private readonly worker: NetworkWorker; + private readonly chain: IBeaconChain; + private readonly logger: Logger; + private readonly metrics: Metrics | null; + private readonly gossipQueues = createGossipQueues(); + private readonly gossipTopicConcurrency = mapValues(this.gossipQueues, () => 0); + + constructor(modules: NetworkProcessorModules, private readonly opts: NetworkProcessorOpts) { + const {chain, events, logger, metrics} = modules; + this.chain = chain; + this.metrics = metrics; + this.logger = logger; + this.worker = new NetworkWorker(modules, opts); + + events.on(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage.bind(this)); + + if (metrics) { + metrics.gossipValidationQueueLength.addCollect(() => { + for (const topic of executeGossipWorkOrder) { + metrics.gossipValidationQueueLength.set({topic}, this.gossipQueues[topic].length); + metrics.gossipValidationQueueConcurrency.set({topic}, this.gossipTopicConcurrency[topic]); + } + }); + } + + // TODO: Pull new work when available + // this.bls.onAvailable(() => this.executeWork()); + // this.regen.onAvailable(() => this.executeWork()); + } + + dropAllJobs(): void { + for (const topic of executeGossipWorkOrder) { + this.gossipQueues[topic].clear(); + } + } + + dumpGossipQueue(topic: GossipType): PendingGossipsubMessage[] { + const queue = this.gossipQueues[topic]; + if (queue === undefined) { + throw Error(`Unknown gossipType ${topic}, known values: ${Object.keys(this.gossipQueues).join(", ")}`); + } + + return queue.getAll(); + } + + private onPendingGossipsubMessage(data: PendingGossipsubMessage): void { + const droppedJob = this.gossipQueues[data.topic.type].add(data); + if (droppedJob) { + // TODO: Should report the dropped job to gossip? It will be eventually pruned from the mcache + this.metrics?.gossipValidationQueueDroppedJobs.inc({topic: data.topic.type}); + } + + // Tentatively perform work + this.executeWork(); + } + + private executeWork(): void { + // TODO: Maybe de-bounce by timing the last time executeWork was run + + this.metrics?.networkProcessor.executeWorkCalls.inc(); + let jobsSubmitted = 0; + + job_loop: while (jobsSubmitted < MAX_JOBS_SUBMITTED_PER_TICK) { + // Check canAcceptWork before calling queue.next() since it consumes the items + if (!this.chain.blsThreadPoolCanAcceptWork() || !this.chain.regenCanAcceptWork()) { + this.metrics?.networkProcessor.canNotAcceptWork.inc(); + break; + } + + for (const topic of executeGossipWorkOrder) { + if ( + this.opts.maxGossipTopicConcurrency !== undefined && + this.gossipTopicConcurrency[topic] > this.opts.maxGossipTopicConcurrency + ) { + // Reached concurrency limit for topic, continue to next topic + continue; + } + + const item = this.gossipQueues[topic].next(); + if (item) { + this.gossipTopicConcurrency[topic]++; + this.worker + .processPendingGossipsubMessage(item) + .finally(() => this.gossipTopicConcurrency[topic]--) + .catch((e) => this.logger.error("processGossipAttestations must not throw", {}, e)); + + jobsSubmitted++; + // Attempt to find more work, but check canAcceptWork() again and run executeGossipWorkOrder priorization + continue job_loop; + } + } + + // No item of work available on all queues, break off job_loop + break; + } + + this.metrics?.networkProcessor.jobsSubmitted.observe(jobsSubmitted); + } +} diff --git a/packages/beacon-node/src/network/processor/types.ts b/packages/beacon-node/src/network/processor/types.ts new file mode 100644 index 00000000000..ebff7e68d43 --- /dev/null +++ b/packages/beacon-node/src/network/processor/types.ts @@ -0,0 +1,17 @@ +import {PeerId} from "@libp2p/interface-peer-id"; +import {Message} from "@libp2p/interface-pubsub"; +import {GossipTopic} from "../gossip/index.js"; + +export type GossipAttestationsWork = { + messages: PendingGossipsubMessage[]; +}; + +export type PendingGossipsubMessage = { + topic: GossipTopic; + msg: Message; + msgId: string; + // TODO: Refactor into accepting string (requires gossipsub changes) for easier multi-threading + propagationSource: PeerId; + seenTimestampSec: number; + startProcessUnixSec: number | null; +}; diff --git a/packages/beacon-node/src/network/processor/worker.ts b/packages/beacon-node/src/network/processor/worker.ts new file mode 100644 index 00000000000..64edfbf3707 --- /dev/null +++ b/packages/beacon-node/src/network/processor/worker.ts @@ -0,0 +1,52 @@ +import {IBeaconChain} from "../../chain/interface.js"; +import {Metrics} from "../../metrics/metrics.js"; +import {NetworkEvent, NetworkEventBus} from "../events.js"; +import {GossipHandlers, GossipValidatorFn} from "../gossip/interface.js"; +import {getGossipHandlers, GossipHandlerOpts, ValidatorFnsModules} from "./gossipHandlers.js"; +import {getGossipValidatorFn, ValidatorFnModules} from "./gossipValidatorFn.js"; +import {PendingGossipsubMessage} from "./types.js"; + +export type NetworkWorkerModules = ValidatorFnsModules & + ValidatorFnModules & { + chain: IBeaconChain; + events: NetworkEventBus; + metrics: Metrics | null; + // Optionally pass custom GossipHandlers, for testing + gossipHandlers?: GossipHandlers; + }; + +export class NetworkWorker { + private readonly events: NetworkEventBus; + private readonly metrics: Metrics | null; + private readonly gossipValidatorFn: GossipValidatorFn; + + constructor(modules: NetworkWorkerModules, opts: GossipHandlerOpts) { + this.events = modules.events; + this.metrics = modules.metrics; + this.gossipValidatorFn = getGossipValidatorFn(modules.gossipHandlers ?? getGossipHandlers(modules, opts), modules); + } + + async processPendingGossipsubMessage(message: PendingGossipsubMessage): Promise { + message.startProcessUnixSec = Date.now() / 1000; + + const acceptance = await this.gossipValidatorFn( + message.topic, + message.msg, + message.propagationSource.toString(), + message.seenTimestampSec + ); + + if (message.startProcessUnixSec !== null) { + this.metrics?.gossipValidationQueueJobWaitTime.observe( + {topic: message.topic.type}, + message.startProcessUnixSec - message.seenTimestampSec + ); + this.metrics?.gossipValidationQueueJobTime.observe( + {topic: message.topic.type}, + Date.now() / 1000 - message.startProcessUnixSec + ); + } + + this.events.emit(NetworkEvent.gossipMessageValidationResult, message.msgId, message.propagationSource, acceptance); + } +} diff --git a/packages/beacon-node/src/util/queue/itemQueue.ts b/packages/beacon-node/src/util/queue/itemQueue.ts index 802a9b0e84e..46bb2b62f55 100644 --- a/packages/beacon-node/src/util/queue/itemQueue.ts +++ b/packages/beacon-node/src/util/queue/itemQueue.ts @@ -41,6 +41,10 @@ export class JobItemQueue { } } + get jobLen(): number { + return this.jobs.length; + } + push(...args: Args): Promise { if (this.opts.signal.aborted) { throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED}); diff --git a/packages/beacon-node/test/e2e/network/gossipsub.test.ts b/packages/beacon-node/test/e2e/network/gossipsub.test.ts index ae80c66d9a2..ee1bb71b2a7 100644 --- a/packages/beacon-node/test/e2e/network/gossipsub.test.ts +++ b/packages/beacon-node/test/e2e/network/gossipsub.test.ts @@ -5,7 +5,7 @@ import {capella, phase0, ssz, allForks} from "@lodestar/types"; import {sleep} from "@lodestar/utils"; import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; -import {getReqRespHandlers, Network} from "../../../src/network/index.js"; +import {getReqRespHandlers, Network, NetworkInitModules} from "../../../src/network/index.js"; import {defaultNetworkOptions, NetworkOptions} from "../../../src/network/options.js"; import {GossipType, GossipHandlers} from "../../../src/network/gossip/index.js"; @@ -25,6 +25,7 @@ const opts: NetworkOptions = { localMultiaddrs: [], discv5FirstQueryDelayMs: 0, discv5: null, + skipParamsLog: true, }; // Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache @@ -86,10 +87,9 @@ describe("gossipsub", function () { const loggerA = testLogger("A"); const loggerB = testLogger("B"); - const modules = { + const modules: Omit = { config: beaconConfig, chain, - db, reqRespHandlers, gossipHandlers, signal: controller.signal, diff --git a/packages/beacon-node/test/unit/chain/validation/block.test.ts b/packages/beacon-node/test/unit/chain/validation/block.test.ts index edcd321a8e8..c3a4326c705 100644 --- a/packages/beacon-node/test/unit/chain/validation/block.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/block.test.ts @@ -44,7 +44,7 @@ describe("gossip block validation", function () { verifySignature = sinon.stub(); verifySignature.resolves(true); - chain.bls = {verifySignatureSets: verifySignature, close: () => Promise.resolve()}; + chain.bls = {verifySignatureSets: verifySignature, close: () => Promise.resolve(), canAcceptWork: () => true}; forkChoice.getFinalizedCheckpoint.returns({epoch: 0, root: ZERO_HASH, rootHex: ""}); diff --git a/packages/beacon-node/test/unit/network/processorQueues.test.ts b/packages/beacon-node/test/unit/network/processorQueues.test.ts new file mode 100644 index 00000000000..0c272159e51 --- /dev/null +++ b/packages/beacon-node/test/unit/network/processorQueues.test.ts @@ -0,0 +1,108 @@ +import {expect} from "chai"; +import {sleep} from "@lodestar/utils"; + +type ValidateOpts = { + skipAsync1: boolean; + skipAsync2: boolean; +}; + +async function validateTest(job: string, tracker: string[], opts: ValidateOpts): Promise { + tracker.push(`job:${job} step:0`); + + await getStateFromCache(opts.skipAsync1); + tracker.push(`job:${job} step:1`); + + if (!opts.skipAsync2) { + await sleep(0); + } + tracker.push(`job:${job} step:2`); +} + +async function getStateFromCache(retrieveSync: boolean): Promise { + if (retrieveSync) { + return 1; + } else { + await sleep(0); + return 2; + } +} + +describe("event loop with branching async", () => { + const eachAwaitPointHoldsJobs = [ + "job:0 step:0", + "job:1 step:0", + "job:2 step:0", + "job:0 step:1", + "job:1 step:1", + "job:2 step:1", + "job:0 step:2", + "job:1 step:2", + "job:2 step:2", + ]; + + const onlyStartOfStep1HoldsJobs = [ + "job:0 step:0", + "job:1 step:0", + "job:2 step:0", + "job:0 step:1", + "job:0 step:2", + "job:1 step:1", + "job:1 step:2", + "job:2 step:1", + "job:2 step:2", + ]; + + const eachJobCompletesInSequence = [ + "job:0 step:0", + "job:0 step:1", + "job:0 step:2", + "job:1 step:0", + "job:1 step:1", + "job:1 step:2", + "job:2 step:0", + "job:2 step:1", + "job:2 step:2", + ]; + + const testCases: {opts: ValidateOpts; expectedTrackerVoid: string[]; expectedTrackerAwait: string[]}[] = [ + { + opts: {skipAsync1: false, skipAsync2: false}, + expectedTrackerVoid: eachAwaitPointHoldsJobs, + expectedTrackerAwait: eachJobCompletesInSequence, + }, + { + opts: {skipAsync1: true, skipAsync2: false}, + expectedTrackerVoid: eachAwaitPointHoldsJobs, + expectedTrackerAwait: eachJobCompletesInSequence, + }, + { + opts: {skipAsync1: false, skipAsync2: true}, + expectedTrackerVoid: onlyStartOfStep1HoldsJobs, + expectedTrackerAwait: eachJobCompletesInSequence, + }, + { + opts: {skipAsync1: true, skipAsync2: true}, + expectedTrackerVoid: onlyStartOfStep1HoldsJobs, + expectedTrackerAwait: eachJobCompletesInSequence, + }, + ]; + + for (const {opts, expectedTrackerVoid, expectedTrackerAwait} of testCases) { + const jobs: string[] = []; + for (let i = 0; i < 3; i++) jobs.push(String(i)); + + it(`${JSON.stringify(opts)} Promise.all`, async () => { + const tracker: string[] = []; + await Promise.all(jobs.map((job) => validateTest(job, tracker, opts))); + expect(tracker).deep.equals(expectedTrackerVoid); + }); + + it(`${JSON.stringify(opts)} await each`, async () => { + const tracker: string[] = []; + for (const job of jobs) { + await validateTest(job, tracker, opts); + } + expect(tracker).deep.equals(expectedTrackerAwait); + }); + } +}); diff --git a/packages/beacon-node/test/utils/mocks/bls.ts b/packages/beacon-node/test/utils/mocks/bls.ts index 0013a2d49ea..57e84d509fc 100644 --- a/packages/beacon-node/test/utils/mocks/bls.ts +++ b/packages/beacon-node/test/utils/mocks/bls.ts @@ -10,4 +10,8 @@ export class BlsVerifierMock implements IBlsVerifier { async close(): Promise { // } + + canAcceptWork(): boolean { + return true; + } } diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index eb569387108..9446a8eba51 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -225,6 +225,14 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} + + regenCanAcceptWork(): boolean { + return true; + } + + blsThreadPoolCanAcceptWork(): boolean { + return true; + } } const root = ssz.Root.defaultValue() as Uint8Array; diff --git a/packages/cli/src/options/beaconNodeOptions/network.ts b/packages/cli/src/options/beaconNodeOptions/network.ts index 6e9b54e73c1..7ed4d916194 100644 --- a/packages/cli/src/options/beaconNodeOptions/network.ts +++ b/packages/cli/src/options/beaconNodeOptions/network.ts @@ -23,6 +23,7 @@ export type NetworkArgs = { "network.gossipsubDHigh": number; "network.gossipsubAwaitHandler": boolean; "network.rateLimitMultiplier": number; + "network.maxGossipTopicConcurrency"?: number; /** @deprecated This option is deprecated and should be removed in next major release. */ "network.requestCountPeerLimit": number; @@ -67,6 +68,7 @@ export function parseArgs(args: NetworkArgs): IBeaconNodeOptions["network"] { gossipsubAwaitHandler: args["network.gossipsubAwaitHandler"], mdns: args["mdns"], rateLimitMultiplier: args["network.rateLimitMultiplier"], + maxGossipTopicConcurrency: args["network.maxGossipTopicConcurrency"], }; } @@ -237,4 +239,10 @@ export const options: CliCommandOptions = { defaultDescription: String(defaultOptions.network.rateLimitMultiplier), group: "network", }, + + "network.maxGossipTopicConcurrency": { + type: "number", + hidden: true, + group: "network", + }, }; diff --git a/packages/cli/test/unit/options/beaconNodeOptions.test.ts b/packages/cli/test/unit/options/beaconNodeOptions.test.ts index c13f04eb069..d8a9bb43830 100644 --- a/packages/cli/test/unit/options/beaconNodeOptions.test.ts +++ b/packages/cli/test/unit/options/beaconNodeOptions.test.ts @@ -83,6 +83,7 @@ describe("options / beaconNodeOptions", () => { "network.gossipsubDHigh": 6, "network.gossipsubAwaitHandler": true, "network.rateLimitMultiplier": 1, + "network.maxGossipTopicConcurrency": 64, "sync.isSingleNode": true, "sync.disableProcessAsChainSegment": true, @@ -171,6 +172,7 @@ describe("options / beaconNodeOptions", () => { gossipsubAwaitHandler: true, mdns: false, rateLimitMultiplier: 1, + maxGossipTopicConcurrency: 64, }, sync: { isSingleNode: true,