Skip to content

Commit

Permalink
Revert "Revert "Pull gossip queues for better throughput (#5195)" (#5305
Browse files Browse the repository at this point in the history
)"

This reverts commit b861ab8.
  • Loading branch information
twoeths authored and wemeetagain committed Apr 7, 2023
1 parent 83b29d7 commit 4e5f2fd
Show file tree
Hide file tree
Showing 32 changed files with 602 additions and 189 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export type Api = {
/** TODO: description */
getSyncChainsDebugState(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: SyncChainDebugState[]}}>>;
/** Dump all items in a gossip queue, by gossipType */
getGossipQueueItems(gossipType: string): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: GossipQueueItem[]}}>>;
getGossipQueueItems(gossipType: string): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: unknown[]}}>>;
/** Dump all items in the regen queue */
getRegenQueueItems(): Promise<ApiClientResponse<{[HttpStatusCode.OK]: {data: RegenQueueItem[]}}>>;
/** Dump all items in the block processor queue */
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {validateBlsToExecutionChange} from "../../../../chain/validation/blsToEx
import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee.js";
import {ApiModules} from "../../types.js";
import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/gossip/handlers/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";

export function getBeaconPoolApi({
chain,
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export function getLodestarApi({

async getGossipQueueItems(gossipType: GossipType | string) {
return {
data: await network.dumpGossipQueueItems(gossipType),
data: await network.dumpGossipQueue(gossipType as GossipType),
};
},

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {CommitteeSubscription} from "../../../network/subnets/index.js";
import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getValidatorStatus} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/gossip/handlers/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js";

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/bls/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ export interface IBlsVerifier {

/** For multithread pool awaits terminating all workers */
close(): Promise<void>;

/**
* Returns true if BLS worker pool is ready to accept more work jobs.
*/
canAcceptWork(): boolean;
}
21 changes: 20 additions & 1 deletion packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const MAX_BUFFERED_SIGS = 32;
*/
const MAX_BUFFER_WAIT_MS = 100;

/**
* Max concurrent jobs on `canAcceptWork` status
*/
const MAX_JOBS_CAN_ACCEPT_WORK = 512;

type WorkerApi = {
verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise<BlsWorkResult>;
};
Expand Down Expand Up @@ -110,6 +115,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
} | null = null;
private blsVerifyAllMultiThread: boolean;
private closed = false;
private workersBusy = 0;

constructor(options: BlsMultiThreadWorkerPoolOptions, modules: BlsMultiThreadWorkerPoolModules) {
const {logger, metrics} = modules;
Expand All @@ -127,10 +133,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.workers = this.createWorkers(implementation, defaultPoolSize);

if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => metrics.blsThreadPool.queueLength.set(this.jobs.length));
metrics.blsThreadPool.queueLength.addCollect(() => {
metrics.blsThreadPool.queueLength.set(this.jobs.length);
metrics.blsThreadPool.workersBusy.set(this.workersBusy);
});
}
}

canAcceptWork(): boolean {
return (
this.workersBusy < defaultPoolSize &&
// TODO: Should also bound the jobs queue?
this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK
);
}

async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise<boolean> {
// Pubkeys are aggregated in the main thread regardless if verified in workers or in main thread
this.metrics?.bls.aggregatedPubkeys.inc(getAggregatedPubkeysCount(sets));
Expand Down Expand Up @@ -310,6 +327,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

const workerApi = worker.status.workerApi;
worker.status = {code: WorkerStatusCode.running, workerApi};
this.workersBusy++;

try {
let startedSigSets = 0;
Expand Down Expand Up @@ -375,6 +393,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}

worker.status = {code: WorkerStatusCode.idle, workerApi};
this.workersBusy--;

// Potentially run a new job
setTimeout(this.runJob, 0);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/bls/singleThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ export class BlsSingleThreadVerifier implements IBlsVerifier {
async close(): Promise<void> {
// nothing to do
}

canAcceptWork(): boolean {
// Since sigs are verified blocking the main thread, there's no mechanism to throttle
return true;
}
}
12 changes: 10 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {BeaconClock, LocalClock} from "./clock/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData} from "./interface.js";
import {IChainOptions} from "./options.js";
import {IStateRegenerator, QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {initializeForkChoice} from "./forkChoice/index.js";
import {computeAnchorCheckpoint} from "./initState.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
Expand Down Expand Up @@ -91,7 +91,7 @@ export class BeaconChain implements IBeaconChain {
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
readonly regen: IStateRegenerator;
readonly regen: QueuedStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;

Expand Down Expand Up @@ -281,6 +281,14 @@ export class BeaconChain implements IBeaconChain {
await this.bls.close();
}

regenCanAcceptWork(): boolean {
return this.regen.canAcceptWork();
}

blsThreadPoolCanAcceptWork(): boolean {
return this.bls.canAcceptWork();
}

validatorSeenAtEpoch(index: ValidatorIndex, epoch: Epoch): boolean {
// Caller must check that epoch is not older that current epoch - 1
// else the caches for that epoch may already be pruned.
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ export interface IBeaconChain {
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
blsThreadPoolCanAcceptWork(): boolean;
}

export type SSZObjectType =
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {StateRegenerator, RegenModules} from "./regen.js";
import {RegenError, RegenErrorCode} from "./errors.js";

const REGEN_QUEUE_MAX_LEN = 256;
// TODO: Should this constant be lower than above? 256 feels high
const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
Expand Down Expand Up @@ -46,6 +48,10 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics = modules.metrics;
}

canAcceptWork(): boolean {
return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD;
}

/**
* Get the state to run with `block`.
* - State after `block.parentRoot` dialed forward to block.slot
Expand Down
22 changes: 21 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,26 @@ export function createLodestarMetrics(
}),
gossipValidationQueueConcurrency: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_concurrency",
help: "Current concurrency of gossip validation queue",
help: "Current count of jobs being run on network processor for topic",
labelNames: ["topic"],
}),

networkProcessor: {
executeWorkCalls: register.gauge({
name: "lodestar_network_processor_execute_work_calls_total",
help: "Total calls to network processor execute work fn",
}),
jobsSubmitted: register.histogram({
name: "lodestar_network_processor_execute_jobs_submitted_total",
help: "Total calls to network processor execute work fn",
buckets: [0, 1, 5, 128],
}),
canNotAcceptWork: register.gauge({
name: "lodestar_network_processor_can_not_accept_work_total",
help: "Total times network processor can not accept work on executeWork",
}),
},

discv5: {
decodeEnrAttemptCount: register.counter({
name: "lodestar_discv5_decode_enr_attempt_count",
Expand Down Expand Up @@ -542,6 +558,10 @@ export function createLodestarMetrics(
name: "lodestar_bls_thread_pool_queue_length",
help: "Count of total block processor queue length",
}),
workersBusy: register.gauge({
name: "lodestar_bls_thread_pool_workers_busy",
help: "Count of current busy workers",
}),
totalJobsGroupsStarted: register.gauge({
name: "lodestar_bls_thread_pool_job_groups_started_total",
help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs",
Expand Down
12 changes: 12 additions & 0 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {EventEmitter} from "events";
import {PeerId} from "@libp2p/interface-peer-id";
import StrictEventEmitter from "strict-event-emitter-types";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {phase0} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js";
import {PendingGossipsubMessage} from "./processor/types.js";

export enum NetworkEvent {
/** A relevant peer has connected or has been re-STATUS'd */
Expand All @@ -14,13 +16,23 @@ export enum NetworkEvent {
gossipHeartbeat = "gossipsub.heartbeat",
reqRespRequest = "req-resp.request",
unknownBlockParent = "unknownBlockParent",

// Network processor events
pendingGossipsubMessage = "gossip.pendingGossipsubMessage",
gossipMessageValidationResult = "gossip.messageValidationResult",
}

export type NetworkEvents = {
[NetworkEvent.peerConnected]: (peer: PeerId, status: phase0.Status) => void;
[NetworkEvent.peerDisconnected]: (peer: PeerId) => void;
[NetworkEvent.reqRespRequest]: (request: RequestTypedContainer, peer: PeerId) => void;
[NetworkEvent.unknownBlockParent]: (blockInput: BlockInput, peerIdStr: string) => void;
[NetworkEvent.pendingGossipsubMessage]: (data: PendingGossipsubMessage) => void;
[NetworkEvent.gossipMessageValidationResult]: (
msgId: string,
propagationSource: PeerId,
acceptance: TopicValidatorResult
) => void;
};

export type INetworkEventBus = StrictEventEmitter<EventEmitter, NetworkEvents>;
Expand Down
64 changes: 26 additions & 38 deletions packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {GossipSub, GossipsubEvents} from "@chainsafe/libp2p-gossipsub";
import {PublishOpts, SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types";
import {PeerScore, PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score";
Expand All @@ -15,19 +17,10 @@ import {PeersData} from "../peers/peersData.js";
import {ClientKind} from "../peers/client.js";
import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js";
import {Libp2p} from "../interface.js";
import {
GossipJobQueues,
GossipTopic,
GossipTopicMap,
GossipType,
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
GossipBeaconNode,
} from "./interface.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipBeaconNode, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap} from "./interface.js";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js";
import {createValidatorFnsByType} from "./validation/index.js";

import {
computeGossipPeerScoreParams,
Expand All @@ -48,10 +41,9 @@ export type Eth2GossipsubModules = {
libp2p: Libp2p;
logger: Logger;
metrics: Metrics | null;
signal: AbortSignal;
eth2Context: Eth2Context;
gossipHandlers: GossipHandlers;
peersData: PeersData;
events: NetworkEventBus;
};

export type Eth2GossipsubOpts = {
Expand All @@ -60,6 +52,7 @@ export type Eth2GossipsubOpts = {
gossipsubDLow?: number;
gossipsubDHigh?: number;
gossipsubAwaitHandler?: boolean;
skipParamsLog?: boolean;
};

/**
Expand All @@ -76,23 +69,21 @@ export type Eth2GossipsubOpts = {
* See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
*/
export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
readonly jobQueues: GossipJobQueues;
readonly scoreParams: Partial<PeerScoreParams>;
private readonly config: BeaconConfig;
private readonly logger: Logger;
private readonly peersData: PeersData;
private readonly events: NetworkEventBus;

// Internal caches
private readonly gossipTopicCache: GossipTopicCache;

private readonly validatorFnsByType: ValidatorFnsByType;

constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) {
const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts;
const gossipTopicCache = new GossipTopicCache(modules.config);

const scoreParams = computeGossipPeerScoreParams(modules);
const {config, logger, metrics, signal, gossipHandlers, peersData} = modules;
const {config, logger, metrics, peersData, events} = modules;

// Gossipsub parameters defined here:
// https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
Expand Down Expand Up @@ -137,29 +128,21 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
this.config = config;
this.logger = logger;
this.peersData = peersData;
this.events = events;
this.gossipTopicCache = gossipTopicCache;

// Note: We use the validator functions as handlers. No handler will be registered to gossipsub.
// libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone.
// TODO: Force to ensure there's a validatorFunction attached to every received topic.
const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, {
config,
logger,
metrics,
signal,
});
this.validatorFnsByType = validatorFnsByType;
this.jobQueues = jobQueues;

if (metrics) {
metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}

this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this));
this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this));

// Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted.
// Scoring issues require this dump + current peer score stats to re-calculate scores.
this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)});
if (!opts.skipParamsLog) {
this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)});
}
}

/**
Expand Down Expand Up @@ -422,14 +405,19 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
// Get seenTimestamp before adding the message to the queue or add async delays
const seenTimestampSec = Date.now() / 1000;

// Puts object in queue, validates, then processes
this.validatorFnsByType[topic.type](topic, msg, propagationSource.toString(), seenTimestampSec)
.then((acceptance) => {
this.reportMessageValidationResult(msgId, propagationSource, acceptance);
})
.catch((e) => {
this.logger.error("Error onGossipsubMessage", {}, e);
});
// Emit message to network processor
this.events.emit(NetworkEvent.pendingGossipsubMessage, {
topic,
msg,
msgId,
propagationSource,
seenTimestampSec,
startProcessUnixSec: null,
});
}

private onValidationResult(msgId: string, propagationSource: PeerId, acceptance: TopicValidatorResult): void {
this.reportMessageValidationResult(msgId, propagationSource, acceptance);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/gossip/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export {Eth2Gossipsub} from "./gossipsub.js";
export {getGossipHandlers} from "./handlers/index.js";
export {getGossipHandlers} from "../processor/gossipHandlers.js";
export {getCoreTopicsAtFork} from "./topic.js";
export * from "./interface.js";
Loading

0 comments on commit 4e5f2fd

Please sign in to comment.