Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull gossip queues for better throughput #5195

Merged
merged 11 commits into from
Mar 1, 2023
2 changes: 1 addition & 1 deletion packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export type Api = {
/** TODO: description */
getSyncChainsDebugState(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: SyncChainDebugState[]}}>>;
/** Dump all items in a gossip queue, by gossipType */
getGossipQueueItems(gossipType: string): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: GossipQueueItem[]}}>>;
getGossipQueueItems(gossipType: string): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: unknown[]}}>>;
/** Dump all items in the regen queue */
getRegenQueueItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: RegenQueueItem[]}}>>;
/** Dump all items in the block processor queue */
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {validateBlsToExecutionChange} from "../../../../chain/validation/blsToEx
import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee.js";
import {ApiModules} from "../../types.js";
import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/gossip/handlers/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";

export function getBeaconPoolApi({
chain,
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export function getLodestarApi({

async getGossipQueueItems(gossipType: GossipType | string) {
return {
data: await network.dumpGossipQueueItems(gossipType),
data: await network.dumpGossipQueue(gossipType as GossipType),
};
},

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {CommitteeSubscription} from "../../../network/subnets/index.js";
import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getValidatorStatus} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/gossip/handlers/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js";

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/bls/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ export interface IBlsVerifier {

/** For multithread pool awaits terminating all workers */
close(): Promise<void>;

/**
* Returns true if BLS worker pool is ready to accept more work jobs.
*/
canAcceptWork(): boolean;
}
21 changes: 20 additions & 1 deletion packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const MAX_BUFFERED_SIGS = 32;
*/
const MAX_BUFFER_WAIT_MS = 100;

/**
* Max concurrent jobs on `canAcceptWork` status
*/
const MAX_JOBS_CAN_ACCEPT_WORK = 512;

type WorkerApi = {
verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise<BlsWorkResult>;
};
Expand Down Expand Up @@ -110,6 +115,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
} | null = null;
private blsVerifyAllMultiThread: boolean;
private closed = false;
private workersBusy = 0;

constructor(options: BlsMultiThreadWorkerPoolOptions, modules: BlsMultiThreadWorkerPoolModules) {
const {logger, metrics} = modules;
Expand All @@ -127,10 +133,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.workers = this.createWorkers(implementation, defaultPoolSize);

if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => metrics.blsThreadPool.queueLength.set(this.jobs.length));
metrics.blsThreadPool.queueLength.addCollect(() => {
metrics.blsThreadPool.queueLength.set(this.jobs.length);
metrics.blsThreadPool.workersBusy.set(this.workersBusy);
});
}
}

canAcceptWork(): boolean {
return (
this.workersBusy < defaultPoolSize &&
// TODO: Should also bound the jobs queue?
this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK
);
}

async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise<boolean> {
// Pubkeys are aggregated in the main thread regardless if verified in workers or in main thread
this.metrics?.bls.aggregatedPubkeys.inc(getAggregatedPubkeysCount(sets));
Expand Down Expand Up @@ -310,6 +327,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

const workerApi = worker.status.workerApi;
worker.status = {code: WorkerStatusCode.running, workerApi};
this.workersBusy++;

try {
let startedSigSets = 0;
Expand Down Expand Up @@ -375,6 +393,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}

worker.status = {code: WorkerStatusCode.idle, workerApi};
this.workersBusy--;

// Potentially run a new job
setTimeout(this.runJob, 0);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/bls/singleThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ export class BlsSingleThreadVerifier implements IBlsVerifier {
async close(): Promise<void> {
// nothing to do
}

canAcceptWork(): boolean {
// Since sigs are verified blocking the main thread, there's no mechanism to throttle
return true;
}
}
12 changes: 10 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {BeaconClock, LocalClock} from "./clock/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData} from "./interface.js";
import {IChainOptions} from "./options.js";
import {IStateRegenerator, QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {initializeForkChoice} from "./forkChoice/index.js";
import {computeAnchorCheckpoint} from "./initState.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
Expand Down Expand Up @@ -91,7 +91,7 @@ export class BeaconChain implements IBeaconChain {
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: IStateRegenerator;
readonly regen: QueuedStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;

Expand Down Expand Up @@ -273,6 +273,14 @@ export class BeaconChain implements IBeaconChain {
await this.bls.close();
}

regenCanAcceptWork(): boolean {
return this.regen.canAcceptWork();
}

blsThreadPoolCanAcceptWork(): boolean {
return this.bls.canAcceptWork();
}

validatorSeenAtEpoch(index: ValidatorIndex, epoch: Epoch): boolean {
// Caller must check that epoch is not older that current epoch - 1
// else the caches for that epoch may already be pruned.
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ export interface IBeaconChain {
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
blsThreadPoolCanAcceptWork(): boolean;
}

export type SSZObjectType =
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {StateRegenerator, RegenModules} from "./regen.js";
import {RegenError, RegenErrorCode} from "./errors.js";

const REGEN_QUEUE_MAX_LEN = 256;
// TODO: Should this constant be lower than above? 256 feels high
const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
Expand Down Expand Up @@ -46,6 +48,10 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics = modules.metrics;
}

canAcceptWork(): boolean {
return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD;
}

/**
* Get the state to run with `block`.
* - State after `block.parentRoot` dialed forward to block.slot
Expand Down
22 changes: 21 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,26 @@ export function createLodestarMetrics(
}),
gossipValidationQueueConcurrency: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_concurrency",
help: "Current concurrency of gossip validation queue",
help: "Current count of jobs being run on network processor for topic",
labelNames: ["topic"],
}),

networkProcessor: {
executeWorkCalls: register.gauge({
name: "lodestar_network_processor_execute_work_calls_total",
help: "Total calls to network processor execute work fn",
}),
jobsSubmitted: register.histogram({
name: "lodestar_network_processor_execute_jobs_submitted_total",
help: "Total calls to network processor execute work fn",
buckets: [0, 1, 5, 128],
}),
canNotAcceptWork: register.gauge({
name: "lodestar_network_processor_can_not_accept_work_total",
help: "Total times network processor can not accept work on executeWork",
}),
},

discv5: {
decodeEnrAttemptCount: register.counter({
name: "lodestar_discv5_decode_enr_attempt_count",
Expand Down Expand Up @@ -538,6 +554,10 @@ export function createLodestarMetrics(
name: "lodestar_bls_thread_pool_queue_length",
help: "Count of total block processor queue length",
}),
workersBusy: register.gauge({
name: "lodestar_bls_thread_pool_workers_busy",
help: "Count of current busy workers",
}),
totalJobsGroupsStarted: register.gauge({
name: "lodestar_bls_thread_pool_job_groups_started_total",
help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs",
Expand Down
12 changes: 12 additions & 0 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {EventEmitter} from "events";
import {PeerId} from "@libp2p/interface-peer-id";
import StrictEventEmitter from "strict-event-emitter-types";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {phase0} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js";
import {PendingGossipsubMessage} from "./processor/types.js";

export enum NetworkEvent {
/** A relevant peer has connected or has been re-STATUS'd */
Expand All @@ -14,13 +16,23 @@ export enum NetworkEvent {
gossipHeartbeat = "gossipsub.heartbeat",
reqRespRequest = "req-resp.request",
unknownBlockParent = "unknownBlockParent",

// Network processor events
pendingGossipsubMessage = "gossip.pendingGossipsubMessage",
gossipMessageValidationResult = "gossip.messageValidationResult",
}

export type NetworkEvents = {
[NetworkEvent.peerConnected]: (peer: PeerId, status: phase0.Status) => void;
[NetworkEvent.peerDisconnected]: (peer: PeerId) => void;
[NetworkEvent.reqRespRequest]: (request: RequestTypedContainer, peer: PeerId) => void;
[NetworkEvent.unknownBlockParent]: (blockInput: BlockInput, peerIdStr: string) => void;
[NetworkEvent.pendingGossipsubMessage]: (data: PendingGossipsubMessage) => void;
[NetworkEvent.gossipMessageValidationResult]: (
msgId: string,
propagationSource: PeerId,
acceptance: TopicValidatorResult
) => void;
};

export type INetworkEventBus = StrictEventEmitter<EventEmitter, NetworkEvents>;
Expand Down
64 changes: 26 additions & 38 deletions packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {GossipSub, GossipsubEvents} from "@chainsafe/libp2p-gossipsub";
import {PublishOpts, SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types";
import {PeerScore, PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score";
Expand All @@ -14,19 +16,10 @@ import {PeersData} from "../peers/peersData.js";
import {ClientKind} from "../peers/client.js";
import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js";
import {Libp2p} from "../interface.js";
import {
GossipJobQueues,
GossipTopic,
GossipTopicMap,
GossipType,
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
GossipBeaconNode,
} from "./interface.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipBeaconNode, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap} from "./interface.js";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js";
import {createValidatorFnsByType} from "./validation/index.js";

import {
computeGossipPeerScoreParams,
Expand All @@ -47,10 +40,9 @@ export type Eth2GossipsubModules = {
libp2p: Libp2p;
logger: Logger;
metrics: Metrics | null;
signal: AbortSignal;
eth2Context: Eth2Context;
gossipHandlers: GossipHandlers;
peersData: PeersData;
events: NetworkEventBus;
};

export type Eth2GossipsubOpts = {
Expand All @@ -59,6 +51,7 @@ export type Eth2GossipsubOpts = {
gossipsubDLow?: number;
gossipsubDHigh?: number;
gossipsubAwaitHandler?: boolean;
skipParamsLog?: boolean;
};

/**
Expand All @@ -75,23 +68,21 @@ export type Eth2GossipsubOpts = {
* See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
*/
export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
readonly jobQueues: GossipJobQueues;
readonly scoreParams: Partial<PeerScoreParams>;
private readonly config: BeaconConfig;
private readonly logger: Logger;
private readonly peersData: PeersData;
private readonly events: NetworkEventBus;

// Internal caches
private readonly gossipTopicCache: GossipTopicCache;

private readonly validatorFnsByType: ValidatorFnsByType;

constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) {
const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts;
const gossipTopicCache = new GossipTopicCache(modules.config);

const scoreParams = computeGossipPeerScoreParams(modules);
const {config, logger, metrics, signal, gossipHandlers, peersData} = modules;
const {config, logger, metrics, peersData, events} = modules;

// Gossipsub parameters defined here:
// https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
Expand Down Expand Up @@ -136,29 +127,21 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
this.config = config;
this.logger = logger;
this.peersData = peersData;
this.events = events;
this.gossipTopicCache = gossipTopicCache;

// Note: We use the validator functions as handlers. No handler will be registered to gossipsub.
// libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone.
// TODO: Force to ensure there's a validatorFunction attached to every received topic.
const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, {
config,
logger,
metrics,
signal,
});
this.validatorFnsByType = validatorFnsByType;
this.jobQueues = jobQueues;

if (metrics) {
metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}

this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this));
this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this));

// Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted.
// Scoring issues require this dump + current peer score stats to re-calculate scores.
this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)});
if (!opts.skipParamsLog) {
this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)});
}
}

/**
Expand Down Expand Up @@ -413,14 +396,19 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
// Get seenTimestamp before adding the message to the queue or add async delays
const seenTimestampSec = Date.now() / 1000;

// Puts object in queue, validates, then processes
this.validatorFnsByType[topic.type](topic, msg, propagationSource.toString(), seenTimestampSec)
.then((acceptance) => {
this.reportMessageValidationResult(msgId, propagationSource, acceptance);
})
.catch((e) => {
this.logger.error("Error onGossipsubMessage", {}, e);
});
// Emit message to network processor
this.events.emit(NetworkEvent.pendingGossipsubMessage, {
topic,
msg,
msgId,
propagationSource,
seenTimestampSec,
startProcessUnixSec: null,
});
}

private onValidationResult(msgId: string, propagationSource: PeerId, acceptance: TopicValidatorResult): void {
this.reportMessageValidationResult(msgId, propagationSource, acceptance);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/gossip/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export {Eth2Gossipsub} from "./gossipsub.js";
export {getGossipHandlers} from "./handlers/index.js";
export {getGossipHandlers} from "../processor/gossipHandlers.js";
export {getCoreTopicsAtFork} from "./topic.js";
export * from "./interface.js";
Loading