From 53e3f1eece4a1530cba198ca389dc8621a194f07 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 12 Jul 2023 02:39:26 +0200 Subject: [PATCH 01/10] Add verifySignatureSetsSameMessage --- .../beacon-node/src/chain/bls/interface.ts | 13 + .../src/chain/bls/multithread/index.ts | 233 ++++++++++++------ .../src/chain/bls/multithread/jobItem.ts | 114 +++++++++ .../src/chain/bls/multithread/types.ts | 3 +- .../src/metrics/metrics/lodestar.ts | 18 +- 5 files changed, 298 insertions(+), 83 deletions(-) create mode 100644 packages/beacon-node/src/chain/bls/multithread/jobItem.ts diff --git a/packages/beacon-node/src/chain/bls/interface.ts b/packages/beacon-node/src/chain/bls/interface.ts index 0b704e91e20..1eb097d9e9d 100644 --- a/packages/beacon-node/src/chain/bls/interface.ts +++ b/packages/beacon-node/src/chain/bls/interface.ts @@ -1,3 +1,4 @@ +import {PublicKey} from "@chainsafe/bls/types"; import {ISignatureSet} from "@lodestar/state-transition"; export type VerifySignatureOpts = { @@ -45,6 +46,18 @@ export interface IBlsVerifier { */ verifySignatureSets(sets: ISignatureSet[], opts?: VerifySignatureOpts): Promise; + /** + * Similar to verifySignatureSets but: + * - all signatures have the same signing root + * - return an array of boolean, each element indicates whether the corresponding signature set is valid + * - only support `verifyOnMainThread` option. + */ + verifySignatureSetsSameMessage( + sets: {publicKey: PublicKey; signature: Uint8Array}[], + messsage: Uint8Array, + opts?: Pick + ): Promise; + /** For multithread pool awaits terminating all workers */ close(): Promise; diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 7c3141454ce..de04dea3442 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -7,7 +7,7 @@ import {spawn, Worker} from "@chainsafe/threads"; // eslint-disable-next-line self = undefined; import bls from "@chainsafe/bls"; -import {Implementation, PointFormat} from "@chainsafe/bls/types"; +import {Implementation, PointFormat, PublicKey} from "@chainsafe/bls/types"; import {Logger} from "@lodestar/utils"; import {ISignatureSet} from "@lodestar/state-transition"; import {QueueError, QueueErrorCode} from "../../../util/queue/index.js"; @@ -16,9 +16,16 @@ import {IBlsVerifier, VerifySignatureOpts} from "../interface.js"; import {getAggregatedPubkey, getAggregatedPubkeysCount} from "../utils.js"; import {verifySignatureSetsMaybeBatch} from "../maybeBatch.js"; import {LinkedList} from "../../../util/array.js"; -import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode} from "./types.js"; +import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode, WorkResultError} from "./types.js"; import {chunkifyMaximizeChunkSize} from "./utils.js"; import {defaultPoolSize} from "./poolSize.js"; +import { + JobQueueItem, + JobQueueItemType, + jobItemSameMessageToMultiSet, + jobItemSigSets, + jobItemWorkReq, +} from "./jobItem.js"; export type BlsMultiThreadWorkerPoolModules = { logger: Logger; @@ -66,13 +73,6 @@ type WorkerApi = { verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise; }; -type JobQueueItem = { - resolve: (result: R | PromiseLike) => void; - reject: (error?: Error) => void; - addedTimeMs: number; - workReq: BlsWorkReq; -}; - enum WorkerStatusCode { notInitialized, initializing, @@ -179,15 +179,18 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { // Split large array of sets into smaller. // Very helpful when syncing finalized, sync may submit +1000 sets so chunkify allows to distribute to many workers const results = await Promise.all( - chunkifyMaximizeChunkSize(sets, MAX_SIGNATURE_SETS_PER_JOB).map((setsWorker) => - this.queueBlsWork({ - opts, - sets: setsWorker.map((s) => ({ - publicKey: getAggregatedPubkey(s).toBytes(this.format), - message: s.signingRoot, - signature: s.signature, - })), - }) + chunkifyMaximizeChunkSize(sets, MAX_SIGNATURE_SETS_PER_JOB).map( + (setsChunk) => + new Promise((resolve, reject) => { + return this.queueBlsWork({ + type: JobQueueItemType.default, + resolve, + reject, + addedTimeMs: Date.now(), + opts, + sets: setsChunk, + }); + }) ) ); @@ -199,6 +202,27 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { return results.every((isValid) => isValid === true); } + async verifySignatureSetsSameMessage( + sets: {publicKey: PublicKey; signature: Uint8Array}[], + message: Uint8Array, + opts: Pick = {} + ): Promise { + // TODO: Should chunkify? + // NOTE: verifySignatureSetsSameMessage only supports worker verification + + return new Promise((resolve, reject) => { + this.queueBlsWork({ + type: JobQueueItemType.sameMessage, + resolve, + reject, + addedTimeMs: Date.now(), + opts, + sets, + message, + }); + }); + } + async close(): Promise { if (this.bufferedJobs) { clearTimeout(this.bufferedJobs.timeout); @@ -261,7 +285,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { /** * Register BLS work to be done eventually in a worker */ - private async queueBlsWork(workReq: BlsWorkReq): Promise { + private queueBlsWork(job: JobQueueItem): void { if (this.closed) { throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED}); } @@ -275,46 +299,41 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { this.workers[0].status.code === WorkerStatusCode.initializationError && this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError) ) { - throw this.workers[0].status.error; + return job.reject(this.workers[0].status.error); } - return new Promise((resolve, reject) => { - const job = {resolve, reject, addedTimeMs: Date.now(), workReq}; - - // below is for non-priority jobs - // Append batchable sets to `bufferedJobs`, starting a timeout to push them into `jobs`. - // Do not call `runJob()`, it is called from `runBufferedJobs()` - if (workReq.opts.batchable) { - if (!this.bufferedJobs) { - this.bufferedJobs = { - jobs: new LinkedList(), - prioritizedJobs: new LinkedList(), - sigCount: 0, - firstPush: Date.now(), - timeout: setTimeout(this.runBufferedJobs, MAX_BUFFER_WAIT_MS), - }; - } - const jobs = workReq.opts.priority ? this.bufferedJobs.prioritizedJobs : this.bufferedJobs.jobs; - jobs.push(job); - this.bufferedJobs.sigCount += job.workReq.sets.length; - if (this.bufferedJobs.sigCount > MAX_BUFFERED_SIGS) { - clearTimeout(this.bufferedJobs.timeout); - this.runBufferedJobs(); - } + // Append batchable sets to `bufferedJobs`, starting a timeout to push them into `jobs`. + // Do not call `runJob()`, it is called from `runBufferedJobs()` + if (job.opts.batchable) { + if (!this.bufferedJobs) { + this.bufferedJobs = { + jobs: new LinkedList(), + prioritizedJobs: new LinkedList(), + sigCount: 0, + firstPush: Date.now(), + timeout: setTimeout(this.runBufferedJobs, MAX_BUFFER_WAIT_MS), + }; + } + const jobs = job.opts.priority ? this.bufferedJobs.prioritizedJobs : this.bufferedJobs.jobs; + jobs.push(job); + this.bufferedJobs.sigCount += jobItemSigSets(job); + if (this.bufferedJobs.sigCount > MAX_BUFFERED_SIGS) { + clearTimeout(this.bufferedJobs.timeout); + this.runBufferedJobs(); } + } - // Push job and schedule to call `runJob` in the next macro event loop cycle. - // This is useful to allow batching job submitted from a synchronous for loop, - // and to prevent large stacks since runJob may be called recursively. - else { - if (workReq.opts.priority) { - this.jobs.unshift(job); - } else { - this.jobs.push(job); - } - setTimeout(this.runJob, 0); + // Push job and schedule to call `runJob` in the next macro event loop cycle. + // This is useful to allow batching job submitted from a synchronous for loop, + // and to prevent large stacks since runJob may be called recursively. + else { + if (job.opts.priority) { + this.jobs.unshift(job); + } else { + this.jobs.push(job); } - }); + setTimeout(this.runJob, 0); + } } /** @@ -332,8 +351,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } // Prepare work package - const jobs = this.prepareWork(); - if (jobs.length === 0) { + const jobsInput = this.prepareWork(); + if (jobsInput.length === 0) { return; } @@ -346,22 +365,52 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { this.workersBusy++; try { - let startedSigSets = 0; - for (const job of jobs) { + let startedJobsDefault = 0; + let startedJobsSameMessage = 0; + let startedSetsDefault = 0; + let startedSetsSameMessage = 0; + const workReqs: BlsWorkReq[] = []; + const jobsStarted: JobQueueItem[] = []; + + for (const job of jobsInput) { this.metrics?.blsThreadPool.jobWaitTime.observe((Date.now() - job.addedTimeMs) / 1000); - startedSigSets += job.workReq.sets.length; + + let workReq: BlsWorkReq; + try { + // Note: This can throw, must be handled per-job. + // Pubkey and signature aggregation is defered here + workReq = jobItemWorkReq(job, this.format); + } catch (e) { + // TODO: Add metrics + job.reject(e as Error); + continue; + } + // Re-push all jobs with matching workReq for easier accounting of results + workReqs.push(workReq); + jobsStarted.push(job); + + if (job.type === JobQueueItemType.sameMessage) { + startedJobsSameMessage += 1; + startedSetsSameMessage += job.sets.length; + } else { + startedJobsDefault += 1; + startedSetsDefault += job.sets.length; + } } + const startedSigSets = startedSetsDefault + startedSetsSameMessage; this.metrics?.blsThreadPool.totalJobsGroupsStarted.inc(1); - this.metrics?.blsThreadPool.totalJobsStarted.inc(jobs.length); - this.metrics?.blsThreadPool.totalSigSetsStarted.inc(startedSigSets); + this.metrics?.blsThreadPool.totalJobsStarted.inc({type: JobQueueItemType.default}, startedJobsDefault); + this.metrics?.blsThreadPool.totalJobsStarted.inc({type: JobQueueItemType.sameMessage}, startedJobsSameMessage); + this.metrics?.blsThreadPool.totalSigSetsStarted.inc({type: JobQueueItemType.default}, startedSetsDefault); + this.metrics?.blsThreadPool.totalSigSetsStarted.inc({type: JobQueueItemType.sameMessage}, startedSetsSameMessage); // Send work package to the worker // If the job, metrics or any code below throws: the job will reject never going stale. // Only downside is the the job promise may be resolved twice, but that's not an issue const jobStartNs = process.hrtime.bigint(); - const workResult = await workerApi.verifyManySignatureSets(jobs.map((job) => job.workReq)); + const workResult = await workerApi.verifyManySignatureSets(workReqs); const jobEndNs = process.hrtime.bigint(); const {workerId, batchRetries, batchSigsSuccess, workerStartNs, workerEndNs, results} = workResult; @@ -369,21 +418,39 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { let errorCount = 0; // Un-wrap work package - for (let i = 0; i < jobs.length; i++) { - const job = jobs[i]; + for (let i = 0; i < jobsStarted.length; i++) { + const job = jobsStarted[i]; const jobResult = results[i]; - const sigSetCount = job.workReq.sets.length; - if (!jobResult) { - job.reject(Error(`No jobResult for index ${i}`)); - errorCount += sigSetCount; - } else if (jobResult.code === WorkResultCode.success) { - job.resolve(jobResult.result); - successCount += sigSetCount; - } else { - const workerError = Error(jobResult.error.message); - if (jobResult.error.stack) workerError.stack = jobResult.error.stack; - job.reject(workerError); - errorCount += sigSetCount; + const sigSetCount = jobItemSigSets(job); + + if (job.type === JobQueueItemType.default) { + if (!jobResult || jobResult.code !== WorkResultCode.success) { + job.reject(getJobResultError(jobResult, i)); + errorCount += sigSetCount; + } else { + job.resolve(jobResult.result); + successCount += sigSetCount; + } + } + + // + else if (job.type === JobQueueItemType.sameMessage) { + if (!jobResult || jobResult.code !== WorkResultCode.success) { + job.reject(getJobResultError(jobResult, i)); + errorCount += 1; + } else { + if (jobResult.result) { + // All are valid + job.resolve(job.sets.map(() => true)); + } else { + // Retry each individually + // Create new jobs for each pubkey set, and Promise.all all the results + this.jobs.push(...jobItemSameMessageToMultiSet(job)); + this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); + this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); + } + successCount += 1; + } } } @@ -403,7 +470,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { // Worker communications should never reject if (!this.closed) this.logger.error("BlsMultiThreadWorkerPool error", {}, e as Error); // Reject all - for (const job of jobs) { + for (const job of jobsInput) { job.reject(e as Error); } } @@ -418,8 +485,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { /** * Grab pending work up to a max number of signatures */ - private prepareWork(): JobQueueItem[] { - const jobs: JobQueueItem[] = []; + private prepareWork(): JobQueueItem[] { + const jobs: JobQueueItem[] = []; let totalSigs = 0; while (totalSigs < MAX_SIGNATURE_SETS_PER_JOB) { @@ -429,7 +496,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } jobs.push(job); - totalSigs += job.workReq.sets.length; + totalSigs += jobItemSigSets(job); } return jobs; @@ -462,3 +529,9 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { ); } } + +function getJobResultError(jobResult: WorkResultError | null, i: number): Error { + const workerError = jobResult ? Error(jobResult.error.message) : Error(`No jobResult for index ${i}`); + if (jobResult?.error?.stack) workerError.stack = jobResult.error.stack; + return workerError; +} diff --git a/packages/beacon-node/src/chain/bls/multithread/jobItem.ts b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts new file mode 100644 index 00000000000..5b1ff863761 --- /dev/null +++ b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts @@ -0,0 +1,114 @@ +import bls from "@chainsafe/bls"; +import {CoordType, PointFormat, PublicKey} from "@chainsafe/bls/types"; +import {ISignatureSet, SignatureSetType} from "@lodestar/state-transition"; +import {VerifySignatureOpts} from "../interface.js"; +import {getAggregatedPubkey} from "../utils.js"; +import {BlsWorkReq} from "./types.js"; + +export type JobQueueItem = JobQueueItemDefault | JobQueueItemSameMessage; + +export type JobQueueItemDefault = { + type: JobQueueItemType.default; + resolve: (result: boolean) => void; + reject: (error?: Error) => void; + addedTimeMs: number; + opts: VerifySignatureOpts; + sets: ISignatureSet[]; +}; + +export type JobQueueItemSameMessage = { + type: JobQueueItemType.sameMessage; + resolve: (result: boolean[]) => void; + reject: (error?: Error) => void; + addedTimeMs: number; + opts: VerifySignatureOpts; + sets: {publicKey: PublicKey; signature: Uint8Array}[]; + message: Uint8Array; +}; + +export enum JobQueueItemType { + default = "default", + sameMessage = "same_message", +} + +/** + * Return count of signature sets from a JobQueueItem + */ +export function jobItemSigSets(job: JobQueueItem): number { + switch (job.type) { + case JobQueueItemType.default: + return job.sets.length; + case JobQueueItemType.sameMessage: + return 1; + } +} + +/** + * Prepare BlsWorkReq from JobQueueItem + * WARNING: May throw with untrusted user input + */ +export function jobItemWorkReq(job: JobQueueItem, format: PointFormat): BlsWorkReq { + switch (job.type) { + case JobQueueItemType.default: + return { + opts: job.opts, + sets: job.sets.map((set) => ({ + // TODO: This can throw + publicKey: getAggregatedPubkey(set).toBytes(format), + signature: set.signature, + message: set.signingRoot, + })), + }; + case JobQueueItemType.sameMessage: + return { + opts: job.opts, + sets: [ + { + publicKey: bls.PublicKey.aggregate(job.sets.map((set) => set.publicKey)).toBytes(format), + signature: bls.Signature.aggregate( + // validate signature = true + job.sets.map((set) => bls.Signature.fromBytes(set.signature, CoordType.affine, true)) + ).toBytes(format), + message: job.message, + }, + ], + }; + } +} + +/** + * Convert a JobQueueItemSameMessage into multiple JobQueueItemDefault linked to the original promise + */ +export function jobItemSameMessageToMultiSet(job: JobQueueItemSameMessage): JobQueueItemDefault[] { + // Retry each individually + // Create new jobs for each pubkey set, and Promise.all all the results + const promises: Promise[] = []; + const jobs: JobQueueItemDefault[] = []; + + for (const set of job.sets) { + promises.push( + new Promise((resolve, reject) => { + jobs.push({ + type: JobQueueItemType.default, + resolve, + reject, + addedTimeMs: job.addedTimeMs, + opts: {batchable: false}, + sets: [ + { + type: SignatureSetType.single, + pubkey: set.publicKey, + signature: set.signature, + signingRoot: job.message, + }, + ], + }); + }) + ); + } + + // Connect jobs to main job + Promise.all(promises).then(job.resolve, job.reject); + + return jobs; +} diff --git a/packages/beacon-node/src/chain/bls/multithread/types.ts b/packages/beacon-node/src/chain/bls/multithread/types.ts index d11b053ed17..cefdc799ee1 100644 --- a/packages/beacon-node/src/chain/bls/multithread/types.ts +++ b/packages/beacon-node/src/chain/bls/multithread/types.ts @@ -21,7 +21,8 @@ export enum WorkResultCode { error = "error", } -export type WorkResult = {code: WorkResultCode.success; result: R} | {code: WorkResultCode.error; error: Error}; +export type WorkResultError = {code: WorkResultCode.error; error: Error}; +export type WorkResult = {code: WorkResultCode.success; result: R} | WorkResultError; export type BlsWorkResult = { /** Ascending integer identifying the worker for metrics */ diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index f3a67700c53..7a0c7b6375d 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -357,13 +357,19 @@ export function createLodestarMetrics( name: "lodestar_bls_thread_pool_job_groups_started_total", help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs", }), - totalJobsStarted: register.gauge({ + totalJobsStarted: register.gauge<"type">({ name: "lodestar_bls_thread_pool_jobs_started_total", help: "Count of total jobs started in bls thread pool, jobs include +1 signature sets", + labelNames: ["type"], }), - totalSigSetsStarted: register.gauge({ + totalSigSetsStarted: register.gauge<"type">({ name: "lodestar_bls_thread_pool_sig_sets_started_total", help: "Count of total signature sets started in bls thread pool, sig sets include 1 pk, msg, sig", + labelNames: ["type"], + }), + totalSigSetsSameMessageStarted: register.gauge({ + name: "lodestar_bls_thread_pool_sig_sets_same_message_started_total", + help: "Count of total signature sets started in bls thread pool, sig sets include 1 pk, msg, sig", }), // Re-verifying a batch means doing double work. This number must be very low or it can be a waste of CPU resources batchRetries: register.gauge({ @@ -375,6 +381,14 @@ export function createLodestarMetrics( name: "lodestar_bls_thread_pool_batch_sigs_success_total", help: "Count of total batches that failed and had to be verified again.", }), + sameMessageRetryJobs: register.gauge({ + name: "lodestar_bls_thread_pool_same_message_jobs_retries_total", + help: "Count of total same message jobs that failed and had to be verified again.", + }), + sameMessageRetrySets: register.gauge({ + name: "lodestar_bls_thread_pool_same_message_sets_retries_total", + help: "Count of total same message sets that failed and had to be verified again.", + }), // To measure the time cost of main thread <-> worker message passing latencyToWorker: register.histogram({ name: "lodestar_bls_thread_pool_latency_to_worker", From 19a38a71b7d9da61b930bb3860ae655d0982980f Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 12 Jul 2023 11:07:32 +0700 Subject: [PATCH 02/10] chore: finish the implementation --- .../beacon-node/src/chain/bls/interface.ts | 6 +-- .../src/chain/bls/multithread/index.ts | 46 +++++++++++-------- .../src/chain/bls/multithread/jobItem.ts | 4 +- .../beacon-node/src/chain/bls/singleThread.ts | 28 +++++++++++ .../src/metrics/metrics/lodestar.ts | 5 ++ .../test/unit/chain/validation/block.test.ts | 7 ++- packages/beacon-node/test/utils/mocks/bls.ts | 5 ++ 7 files changed, 76 insertions(+), 25 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/interface.ts b/packages/beacon-node/src/chain/bls/interface.ts index 1eb097d9e9d..23a5e8d32fc 100644 --- a/packages/beacon-node/src/chain/bls/interface.ts +++ b/packages/beacon-node/src/chain/bls/interface.ts @@ -48,14 +48,14 @@ export interface IBlsVerifier { /** * Similar to verifySignatureSets but: - * - all signatures have the same signing root + * - all signatures have the same message * - return an array of boolean, each element indicates whether the corresponding signature set is valid - * - only support `verifyOnMainThread` option. + * - only support `batchable` option */ verifySignatureSetsSameMessage( sets: {publicKey: PublicKey; signature: Uint8Array}[], messsage: Uint8Array, - opts?: Pick + opts?: Pick ): Promise; /** For multithread pool awaits terminating all workers */ diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index de04dea3442..95a0f9da756 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -202,25 +202,33 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { return results.every((isValid) => isValid === true); } + /** + * Verify signature sets of the same message, only supports worker verification. + */ async verifySignatureSetsSameMessage( sets: {publicKey: PublicKey; signature: Uint8Array}[], message: Uint8Array, - opts: Pick = {} + opts: Pick = {} ): Promise { - // TODO: Should chunkify? - // NOTE: verifySignatureSetsSameMessage only supports worker verification - - return new Promise((resolve, reject) => { - this.queueBlsWork({ - type: JobQueueItemType.sameMessage, - resolve, - reject, - addedTimeMs: Date.now(), - opts, - sets, - message, - }); - }); + // chunkify so that it reduce the risk of retrying when there is at least one invalid signature + const results = await Promise.all( + chunkifyMaximizeChunkSize(sets, MAX_SIGNATURE_SETS_PER_JOB).map( + (setsChunk) => + new Promise((resolve, reject) => { + this.queueBlsWork({ + type: JobQueueItemType.sameMessage, + resolve, + reject, + addedTimeMs: Date.now(), + opts, + sets: setsChunk, + message, + }); + }) + ) + ); + + return results.flat(); } async close(): Promise { @@ -381,7 +389,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { // Pubkey and signature aggregation is defered here workReq = jobItemWorkReq(job, this.format); } catch (e) { - // TODO: Add metrics + this.metrics?.blsThreadPool.errorAggregateSignatureSetsCount.inc({type: job.type}); + // TODO tuyennhv, do an early unwrap? job.reject(e as Error); continue; } @@ -432,15 +441,14 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { successCount += sigSetCount; } } - - // + // handle result of the verification of aggregated signature against aggregated pubkeys else if (job.type === JobQueueItemType.sameMessage) { if (!jobResult || jobResult.code !== WorkResultCode.success) { job.reject(getJobResultError(jobResult, i)); errorCount += 1; } else { if (jobResult.result) { - // All are valid + // All are valid, most of the time it goes here job.resolve(job.sets.map(() => true)); } else { // Retry each individually diff --git a/packages/beacon-node/src/chain/bls/multithread/jobItem.ts b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts index 5b1ff863761..f21e1675b2d 100644 --- a/packages/beacon-node/src/chain/bls/multithread/jobItem.ts +++ b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts @@ -53,7 +53,7 @@ export function jobItemWorkReq(job: JobQueueItem, format: PointFormat): BlsWorkR return { opts: job.opts, sets: job.sets.map((set) => ({ - // TODO: This can throw + // this can throw, handled in the consumer code publicKey: getAggregatedPubkey(set).toBytes(format), signature: set.signature, message: set.signingRoot, @@ -66,7 +66,7 @@ export function jobItemWorkReq(job: JobQueueItem, format: PointFormat): BlsWorkR { publicKey: bls.PublicKey.aggregate(job.sets.map((set) => set.publicKey)).toBytes(format), signature: bls.Signature.aggregate( - // validate signature = true + // validate signature = true job.sets.map((set) => bls.Signature.fromBytes(set.signature, CoordType.affine, true)) ).toBytes(format), message: job.message, diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index 78f3f4bf520..185aec80fa3 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -1,4 +1,7 @@ import {ISignatureSet} from "@lodestar/state-transition"; +import {PublicKey} from "@chainsafe/bls/types"; +import bls from "@chainsafe/bls"; +import {CoordType} from "@chainsafe/blst"; import {Metrics} from "../../metrics/index.js"; import {IBlsVerifier} from "./interface.js"; import {verifySignatureSetsMaybeBatch} from "./maybeBatch.js"; @@ -34,6 +37,31 @@ export class BlsSingleThreadVerifier implements IBlsVerifier { return isValid; } + async verifySignatureSetsSameMessage( + sets: {publicKey: PublicKey; signature: Uint8Array}[], + message: Uint8Array + ): Promise { + const startNs = process.hrtime.bigint(); + const pubkey = bls.PublicKey.aggregate(sets.map((set) => set.publicKey)); + // validate signature = true + const signatures = sets.map((set) => bls.Signature.fromBytes(set.signature, CoordType.affine, true)); + const signature = bls.Signature.aggregate(signatures); + const isAllValid = signature.verify(pubkey, message); + + let result: boolean[]; + if (isAllValid) { + result = sets.map(() => true); + } else { + result = sets.map((set, i) => signatures[i].verify(set.publicKey, message)); + } + + const endNs = process.hrtime.bigint(); + const totalSec = Number(startNs - endNs) / 1e9; + this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.observe(totalSec); + + return result; + } + async close(): Promise { // nothing to do } diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 7a0c7b6375d..87f7c960788 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -336,6 +336,11 @@ export function createLodestarMetrics( name: "lodestar_bls_thread_pool_success_jobs_signature_sets_count", help: "Count of total verified signature sets", }), + errorAggregateSignatureSetsCount: register.gauge<"type">({ + name: "lodestar_bls_thread_pool_error_aggregate_signature_sets_count", + help: "Count of error when aggregating pubkeys or signatures", + labelNames: ["type"], + }), errorJobsSignatureSetsCount: register.gauge({ name: "lodestar_bls_thread_pool_error_jobs_signature_sets_count", help: "Count of total error-ed signature sets", diff --git a/packages/beacon-node/test/unit/chain/validation/block.test.ts b/packages/beacon-node/test/unit/chain/validation/block.test.ts index 297c9c90d6c..6ddb27fceca 100644 --- a/packages/beacon-node/test/unit/chain/validation/block.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/block.test.ts @@ -44,7 +44,12 @@ describe("gossip block validation", function () { verifySignature = sinon.stub(); verifySignature.resolves(true); - chain.bls = {verifySignatureSets: verifySignature, close: () => Promise.resolve(), canAcceptWork: () => true}; + chain.bls = { + verifySignatureSets: verifySignature, + verifySignatureSetsSameMessage: () => Promise.resolve([true]), + close: () => Promise.resolve(), + canAcceptWork: () => true, + }; forkChoice.getFinalizedCheckpoint.returns({epoch: 0, root: ZERO_HASH, rootHex: ""}); diff --git a/packages/beacon-node/test/utils/mocks/bls.ts b/packages/beacon-node/test/utils/mocks/bls.ts index 57e84d509fc..e90287dad52 100644 --- a/packages/beacon-node/test/utils/mocks/bls.ts +++ b/packages/beacon-node/test/utils/mocks/bls.ts @@ -1,3 +1,4 @@ +import {PublicKey} from "@chainsafe/bls/types"; import {IBlsVerifier} from "../../../src/chain/bls/index.js"; export class BlsVerifierMock implements IBlsVerifier { @@ -7,6 +8,10 @@ export class BlsVerifierMock implements IBlsVerifier { return this.isValidResult; } + async verifySignatureSetsSameMessage(sets: {publicKey: PublicKey; signature: Uint8Array}[]): Promise { + return sets.map(() => this.isValidResult); + } + async close(): Promise { // } From b12a1ce6208e28baef3eb993e74a8f202f267289 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 12 Jul 2023 11:18:52 +0700 Subject: [PATCH 03/10] fix: handle cannot aggregate pubkeys and signatures in same message --- .../beacon-node/src/chain/bls/multithread/index.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 95a0f9da756..6f4fad6be28 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -390,8 +390,15 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { workReq = jobItemWorkReq(job, this.format); } catch (e) { this.metrics?.blsThreadPool.errorAggregateSignatureSetsCount.inc({type: job.type}); - // TODO tuyennhv, do an early unwrap? - job.reject(e as Error); + if (job.type === JobQueueItemType.default) { + job.reject(e as Error); + } else { + // there could be an invalid pubkey/signature, retry each individually + // Create new jobs for each pubkey set, and Promise.all all the results + this.jobs.push(...jobItemSameMessageToMultiSet(job)); + this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); + this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); + } continue; } // Re-push all jobs with matching workReq for easier accounting of results From 9ce2fd890dc84d9a6967862f7e439a0e3e5bfe44 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Mon, 17 Jul 2023 13:43:52 +0700 Subject: [PATCH 04/10] fix: jobItemSameMessageToMultiSet considering priority --- .../src/chain/bls/multithread/index.ts | 16 ++++++++++++++-- .../src/chain/bls/multithread/jobItem.ts | 7 ++++--- .../beacon-node/src/chain/bls/singleThread.ts | 2 +- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 6f4fad6be28..e979ad797e7 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -395,7 +395,13 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } else { // there could be an invalid pubkey/signature, retry each individually // Create new jobs for each pubkey set, and Promise.all all the results - this.jobs.push(...jobItemSameMessageToMultiSet(job)); + for (const j of jobItemSameMessageToMultiSet(job)) { + if (j.opts.priority) { + this.jobs.unshift(j); + } else { + this.jobs.push(j); + } + } this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); } @@ -460,7 +466,13 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } else { // Retry each individually // Create new jobs for each pubkey set, and Promise.all all the results - this.jobs.push(...jobItemSameMessageToMultiSet(job)); + for (const j of jobItemSameMessageToMultiSet(job)) { + if (j.opts.priority) { + this.jobs.unshift(j); + } else { + this.jobs.push(j); + } + } this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); } diff --git a/packages/beacon-node/src/chain/bls/multithread/jobItem.ts b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts index f21e1675b2d..5e64ba33436 100644 --- a/packages/beacon-node/src/chain/bls/multithread/jobItem.ts +++ b/packages/beacon-node/src/chain/bls/multithread/jobItem.ts @@ -3,6 +3,7 @@ import {CoordType, PointFormat, PublicKey} from "@chainsafe/bls/types"; import {ISignatureSet, SignatureSetType} from "@lodestar/state-transition"; import {VerifySignatureOpts} from "../interface.js"; import {getAggregatedPubkey} from "../utils.js"; +import {LinkedList} from "../../../util/array.js"; import {BlsWorkReq} from "./types.js"; export type JobQueueItem = JobQueueItemDefault | JobQueueItemSameMessage; @@ -79,11 +80,11 @@ export function jobItemWorkReq(job: JobQueueItem, format: PointFormat): BlsWorkR /** * Convert a JobQueueItemSameMessage into multiple JobQueueItemDefault linked to the original promise */ -export function jobItemSameMessageToMultiSet(job: JobQueueItemSameMessage): JobQueueItemDefault[] { +export function jobItemSameMessageToMultiSet(job: JobQueueItemSameMessage): LinkedList { // Retry each individually // Create new jobs for each pubkey set, and Promise.all all the results const promises: Promise[] = []; - const jobs: JobQueueItemDefault[] = []; + const jobs = new LinkedList(); for (const set of job.sets) { promises.push( @@ -93,7 +94,7 @@ export function jobItemSameMessageToMultiSet(job: JobQueueItemSameMessage): JobQ resolve, reject, addedTimeMs: job.addedTimeMs, - opts: {batchable: false}, + opts: {batchable: false, priority: job.opts.priority}, sets: [ { type: SignatureSetType.single, diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index 185aec80fa3..4fb2da77c70 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -1,7 +1,7 @@ -import {ISignatureSet} from "@lodestar/state-transition"; import {PublicKey} from "@chainsafe/bls/types"; import bls from "@chainsafe/bls"; import {CoordType} from "@chainsafe/blst"; +import {ISignatureSet} from "@lodestar/state-transition"; import {Metrics} from "../../metrics/index.js"; import {IBlsVerifier} from "./interface.js"; import {verifySignatureSetsMaybeBatch} from "./maybeBatch.js"; From 52785b053bedae72425cdb50eef787fc339fa3bd Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Mon, 17 Jul 2023 15:40:04 +0700 Subject: [PATCH 05/10] fix: handle malformed signatures --- .../beacon-node/src/chain/bls/maybeBatch.ts | 45 +++++----- .../beacon-node/src/chain/bls/singleThread.ts | 28 ++++-- .../test/unit/chain/bls/bls.test.ts | 89 +++++++++++++++++++ 3 files changed, 137 insertions(+), 25 deletions(-) create mode 100644 packages/beacon-node/test/unit/chain/bls/bls.test.ts diff --git a/packages/beacon-node/src/chain/bls/maybeBatch.ts b/packages/beacon-node/src/chain/bls/maybeBatch.ts index 57a63115b8c..cb30f7008bb 100644 --- a/packages/beacon-node/src/chain/bls/maybeBatch.ts +++ b/packages/beacon-node/src/chain/bls/maybeBatch.ts @@ -14,26 +14,31 @@ export type SignatureSetDeserialized = { * Abstracted in a separate file to be consumed by the threaded pool and the main thread implementation. */ export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[]): boolean { - if (sets.length >= MIN_SET_COUNT_TO_BATCH) { - return bls.Signature.verifyMultipleSignatures( - sets.map((s) => ({ - publicKey: s.publicKey, - message: s.message, - // true = validate signature - signature: bls.Signature.fromBytes(s.signature, CoordType.affine, true), - })) - ); - } + try { + if (sets.length >= MIN_SET_COUNT_TO_BATCH) { + return bls.Signature.verifyMultipleSignatures( + sets.map((s) => ({ + publicKey: s.publicKey, + message: s.message, + // true = validate signature + signature: bls.Signature.fromBytes(s.signature, CoordType.affine, true), + })) + ); + } - // .every on an empty array returns true - if (sets.length === 0) { - throw Error("Empty signature set"); - } + // .every on an empty array returns true + if (sets.length === 0) { + throw Error("Empty signature set"); + } - // If too few signature sets verify them without batching - return sets.every((set) => { - // true = validate signature - const sig = bls.Signature.fromBytes(set.signature, CoordType.affine, true); - return sig.verify(set.publicKey, set.message); - }); + // If too few signature sets verify them without batching + return sets.every((set) => { + // true = validate signature + const sig = bls.Signature.fromBytes(set.signature, CoordType.affine, true); + return sig.verify(set.publicKey, set.message); + }); + } catch (_) { + // A signature could be malformed, in that case fromBytes throws error + return false; + } } diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index 4fb2da77c70..c005f3a70f0 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -1,4 +1,4 @@ -import {PublicKey} from "@chainsafe/bls/types"; +import {PublicKey, Signature} from "@chainsafe/bls/types"; import bls from "@chainsafe/bls"; import {CoordType} from "@chainsafe/blst"; import {ISignatureSet} from "@lodestar/state-transition"; @@ -43,16 +43,34 @@ export class BlsSingleThreadVerifier implements IBlsVerifier { ): Promise { const startNs = process.hrtime.bigint(); const pubkey = bls.PublicKey.aggregate(sets.map((set) => set.publicKey)); + let isAllValid = true; // validate signature = true - const signatures = sets.map((set) => bls.Signature.fromBytes(set.signature, CoordType.affine, true)); - const signature = bls.Signature.aggregate(signatures); - const isAllValid = signature.verify(pubkey, message); + const signatures = sets.map((set) => { + try { + return bls.Signature.fromBytes(set.signature, CoordType.affine, true); + } catch (_) { + // at least one set has malformed signature + isAllValid = false; + return null; + } + }); + + if (isAllValid) { + const signature = bls.Signature.aggregate(signatures as Signature[]); + isAllValid = signature.verify(pubkey, message); + } let result: boolean[]; if (isAllValid) { result = sets.map(() => true); } else { - result = sets.map((set, i) => signatures[i].verify(set.publicKey, message)); + result = sets.map((set, i) => { + const sig = signatures[i]; + if (sig === null) { + return false; + } + return sig.verify(set.publicKey, message); + }); } const endNs = process.hrtime.bigint(); diff --git a/packages/beacon-node/test/unit/chain/bls/bls.test.ts b/packages/beacon-node/test/unit/chain/bls/bls.test.ts new file mode 100644 index 00000000000..89a5e48a9db --- /dev/null +++ b/packages/beacon-node/test/unit/chain/bls/bls.test.ts @@ -0,0 +1,89 @@ +import bls from "@chainsafe/bls"; +import {expect} from "chai"; +import {CoordType} from "@chainsafe/blst"; +import {PublicKey} from "@chainsafe/bls/types"; +import {ISignatureSet, SignatureSetType} from "@lodestar/state-transition"; +import {BlsSingleThreadVerifier} from "../../../../src/chain/bls/singleThread.js"; +import {BlsMultiThreadWorkerPool} from "../../../../lib/chain/bls/multithread/index.js"; +import {testLogger} from "../../../utils/logger.js"; + +describe("BlsVerifier ", function () { + // take time for creating thread pool + this.timeout(60 * 1000); + const numKeys = 3; + const secretKeys = Array.from({length: numKeys}, (_, i) => bls.SecretKey.fromKeygen(Buffer.alloc(32, i))); + const verifiers = [ + new BlsSingleThreadVerifier({metrics: null}), + new BlsMultiThreadWorkerPool({}, {metrics: null, logger: testLogger()}), + ]; + + for (const verifier of verifiers) { + describe(`${verifier.constructor.name} - verifySignatureSets`, () => { + let sets: ISignatureSet[]; + + beforeEach(() => { + sets = secretKeys.map((secretKey, i) => { + // different signing roots + const signingRoot = Buffer.alloc(32, i); + return { + type: SignatureSetType.single, + pubkey: secretKey.toPublicKey(), + signingRoot, + signature: secretKey.sign(signingRoot).toBytes(), + }; + }); + }); + + it("should verify all signatures", async () => { + expect(await verifier.verifySignatureSets(sets)).to.be.true; + }); + + it("should return false if at least one signature is invalid", async () => { + // signature is valid but not respective to the signing root + sets[1].signingRoot = Buffer.alloc(32, 10); + expect(await verifier.verifySignatureSets(sets)).to.be.false; + }); + + it("should return false if at least one signature is malformed", async () => { + // signature is malformed + const malformedSignature = Buffer.alloc(96, 10); + expect(() => bls.Signature.fromBytes(malformedSignature, CoordType.affine, true)).to.throws(); + sets[1].signature = malformedSignature; + expect(await verifier.verifySignatureSets(sets)).to.be.false; + }); + }); + + describe(`${verifier.constructor.name} - verifySignatureSetsSameMessage`, () => { + let sets: {publicKey: PublicKey; signature: Uint8Array}[] = []; + // same signing root for all sets + const signingRoot = Buffer.alloc(32, 100); + + beforeEach(() => { + sets = secretKeys.map((secretKey) => { + return { + publicKey: secretKey.toPublicKey(), + signature: secretKey.sign(signingRoot).toBytes(), + }; + }); + }); + + it("should verify all signatures", async () => { + expect(await verifier.verifySignatureSetsSameMessage(sets, signingRoot)).to.deep.equal([true, true, true]); + }); + + it("should return false for invalid signature", async () => { + // signature is valid but not respective to the signing root + sets[1].signature = secretKeys[1].sign(Buffer.alloc(32)).toBytes(); + expect(await verifier.verifySignatureSetsSameMessage(sets, signingRoot)).to.be.deep.equal([true, false, true]); + }); + + it("should return false for malformed signature", async () => { + // signature is malformed + const malformedSignature = Buffer.alloc(96, 10); + expect(() => bls.Signature.fromBytes(malformedSignature, CoordType.affine, true)).to.throws(); + sets[1].signature = malformedSignature; + expect(await verifier.verifySignatureSetsSameMessage(sets, signingRoot)).to.be.deep.equal([true, false, true]); + }); + }); + } +}); From d75ea0a388b7e7db5dfc66d04edfcb8e4bcd9f62 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 18 Jul 2023 10:07:47 +0700 Subject: [PATCH 06/10] fix: multithread e2e test --- .../test/e2e/chain/bls/multithread.test.ts | 85 ++++++++++++------- 1 file changed, 54 insertions(+), 31 deletions(-) diff --git a/packages/beacon-node/test/e2e/chain/bls/multithread.test.ts b/packages/beacon-node/test/e2e/chain/bls/multithread.test.ts index 022d2aaacbc..27ea9e09438 100644 --- a/packages/beacon-node/test/e2e/chain/bls/multithread.test.ts +++ b/packages/beacon-node/test/e2e/chain/bls/multithread.test.ts @@ -1,12 +1,13 @@ import {expect} from "chai"; import bls from "@chainsafe/bls"; +import {PublicKey} from "@chainsafe/bls/types"; import {ISignatureSet, SignatureSetType} from "@lodestar/state-transition"; import {BlsMultiThreadWorkerPool} from "../../../../src/chain/bls/multithread/index.js"; import {testLogger} from "../../../utils/logger.js"; import {VerifySignatureOpts} from "../../../../src/chain/bls/interface.js"; describe("chain / bls / multithread queue", function () { - this.timeout(30 * 1000); + this.timeout(60 * 1000); const logger = testLogger(); let controller: AbortController; @@ -22,6 +23,9 @@ describe("chain / bls / multithread queue", function () { }); const sets: ISignatureSet[] = []; + const sameMessageSets: {publicKey: PublicKey; signature: Uint8Array}[] = []; + const sameMessage = Buffer.alloc(32, 100); + before("generate test data", () => { for (let i = 0; i < 3; i++) { const sk = bls.SecretKey.fromBytes(Buffer.alloc(32, i + 1)); @@ -34,6 +38,10 @@ describe("chain / bls / multithread queue", function () { signingRoot: msg, signature: sig.toBytes(), }); + sameMessageSets.push({ + publicKey: pk, + signature: sk.sign(sameMessage).toBytes(), + }); } }); @@ -52,9 +60,10 @@ describe("chain / bls / multithread queue", function () { ): Promise { const pool = await initializePool(); - const isValidPromiseArr: Promise[] = []; + const isValidPromiseArr: Promise[] = []; for (let i = 0; i < 8; i++) { isValidPromiseArr.push(pool.verifySignatureSets(sets, verifySignatureOpts)); + isValidPromiseArr.push(pool.verifySignatureSetsSameMessage(sameMessageSets, sameMessage, verifySignatureOpts)); if (testOpts.sleep) { // Tick forward so the pool sends a job out await new Promise((r) => setTimeout(r, 5)); @@ -63,42 +72,56 @@ describe("chain / bls / multithread queue", function () { const isValidArr = await Promise.all(isValidPromiseArr); for (const [i, isValid] of isValidArr.entries()) { - expect(isValid).to.equal(true, `sig set ${i} returned invalid`); + if (i % 2 === 0) { + expect(isValid).to.equal(true, `sig set ${i} returned invalid`); + } else { + expect(isValid).to.deep.equal([true, true, true], `sig set ${i} returned invalid`); + } } } - it("Should verify multiple signatures submitted synchronously", async () => { - // Given the `setTimeout(this.runJob, 0);` all sets should be verified in a single job an worker - await testManyValidSignatures({sleep: false}); - }); + for (const priority of [true, false]) { + it(`Should verify multiple signatures submitted synchronously priority=${priority}`, async () => { + // Given the `setTimeout(this.runJob, 0);` all sets should be verified in a single job an worker + // when priority = true, jobs are executed in the reverse order + await testManyValidSignatures({sleep: false}, {priority}); + }); + } - it("Should verify multiple signatures submitted asynchronously", async () => { - // Because of the sleep, each sets submitted should be verified in a different job and worker - await testManyValidSignatures({sleep: true}); - }); + for (const priority of [true, false]) { + it(`Should verify multiple signatures submitted asynchronously priority=${priority}`, async () => { + // Because of the sleep, each sets submitted should be verified in a different job and worker + // when priority = true, jobs are executed in the reverse order + await testManyValidSignatures({sleep: true}, {priority}); + }); + } - it("Should verify multiple signatures batched", async () => { - // By setting batchable: true, 5*8 = 40 sig sets should be verified in one job, while 3*8=24 should - // be verified in another job - await testManyValidSignatures({sleep: true}, {batchable: true}); - }); + for (const priority of [true, false]) { + it(`Should verify multiple signatures batched pririty=${priority}`, async () => { + // By setting batchable: true, 5*8 = 40 sig sets should be verified in one job, while 3*8=24 should + // be verified in another job + await testManyValidSignatures({sleep: true}, {batchable: true, priority}); + }); + } - it("Should verify multiple signatures batched, first is invalid", async () => { - // If the first signature is invalid it should not make the rest throw - const pool = await initializePool(); + for (const priority of [true, false]) { + it(`Should verify multiple signatures batched, first is invalid priority=${priority}`, async () => { + // If the first signature is invalid it should not make the rest throw + const pool = await initializePool(); - const invalidSet: ISignatureSet = {...sets[0], signature: Buffer.alloc(32, 0)}; - const isInvalidPromise = pool.verifySignatureSets([invalidSet], {batchable: true}); - const isValidPromiseArr: Promise[] = []; - for (let i = 0; i < 8; i++) { - isValidPromiseArr.push(pool.verifySignatureSets(sets, {batchable: true})); - } + const invalidSet: ISignatureSet = {...sets[0], signature: Buffer.alloc(32, 0)}; + const isInvalidPromise = pool.verifySignatureSets([invalidSet], {batchable: true, priority}); + const isValidPromiseArr: Promise[] = []; + for (let i = 0; i < 8; i++) { + isValidPromiseArr.push(pool.verifySignatureSets(sets, {batchable: true})); + } - await expect(isInvalidPromise).to.rejectedWith("BLST_INVALID_SIZE"); + expect(await isInvalidPromise).to.be.false; - const isValidArr = await Promise.all(isValidPromiseArr); - for (const [i, isValid] of isValidArr.entries()) { - expect(isValid).to.equal(true, `sig set ${i} returned invalid`); - } - }); + const isValidArr = await Promise.all(isValidPromiseArr); + for (const [i, isValid] of isValidArr.entries()) { + expect(isValid).to.equal(true, `sig set ${i} returned invalid`); + } + }); + } }); From 2393ce58bd0616988dc3f70af808a3783fab01f6 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 18 Jul 2023 15:12:05 +0700 Subject: [PATCH 07/10] chore: remove unused metric --- packages/beacon-node/src/metrics/metrics/lodestar.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 87f7c960788..04915a06489 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -372,10 +372,6 @@ export function createLodestarMetrics( help: "Count of total signature sets started in bls thread pool, sig sets include 1 pk, msg, sig", labelNames: ["type"], }), - totalSigSetsSameMessageStarted: register.gauge({ - name: "lodestar_bls_thread_pool_sig_sets_same_message_started_total", - help: "Count of total signature sets started in bls thread pool, sig sets include 1 pk, msg, sig", - }), // Re-verifying a batch means doing double work. This number must be very low or it can be a waste of CPU resources batchRetries: register.gauge({ name: "lodestar_bls_thread_pool_batch_retries_total", From 678f2dbcd35eb1043d55a39a130e20e051fcb856 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Tue, 18 Jul 2023 20:29:30 +0700 Subject: [PATCH 08/10] fix: correct verifySignatureSetsSameMessage opts --- packages/beacon-node/src/chain/bls/interface.ts | 2 +- packages/beacon-node/src/chain/bls/multithread/index.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/interface.ts b/packages/beacon-node/src/chain/bls/interface.ts index 23a5e8d32fc..e9c98ba1920 100644 --- a/packages/beacon-node/src/chain/bls/interface.ts +++ b/packages/beacon-node/src/chain/bls/interface.ts @@ -55,7 +55,7 @@ export interface IBlsVerifier { verifySignatureSetsSameMessage( sets: {publicKey: PublicKey; signature: Uint8Array}[], messsage: Uint8Array, - opts?: Pick + opts?: Omit ): Promise; /** For multithread pool awaits terminating all workers */ diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index e979ad797e7..0ead7505351 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -208,7 +208,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { async verifySignatureSetsSameMessage( sets: {publicKey: PublicKey; signature: Uint8Array}[], message: Uint8Array, - opts: Pick = {} + opts: Omit = {} ): Promise { // chunkify so that it reduce the risk of retrying when there is at least one invalid signature const results = await Promise.all( From 589683ab5c72023b58001cc4ddfe9ad4ae18c3db Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 19 Jul 2023 15:03:34 +0700 Subject: [PATCH 09/10] chore: refactor to retryJobItemSameMessage method --- .../src/chain/bls/multithread/index.ts | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 0ead7505351..680ca6da9b4 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -21,6 +21,7 @@ import {chunkifyMaximizeChunkSize} from "./utils.js"; import {defaultPoolSize} from "./poolSize.js"; import { JobQueueItem, + JobQueueItemSameMessage, JobQueueItemType, jobItemSameMessageToMultiSet, jobItemSigSets, @@ -394,16 +395,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { job.reject(e as Error); } else { // there could be an invalid pubkey/signature, retry each individually - // Create new jobs for each pubkey set, and Promise.all all the results - for (const j of jobItemSameMessageToMultiSet(job)) { - if (j.opts.priority) { - this.jobs.unshift(j); - } else { - this.jobs.push(j); - } - } - this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); - this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); + this.retryJobItemSameMessage(job); } continue; } @@ -465,16 +457,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { job.resolve(job.sets.map(() => true)); } else { // Retry each individually - // Create new jobs for each pubkey set, and Promise.all all the results - for (const j of jobItemSameMessageToMultiSet(job)) { - if (j.opts.priority) { - this.jobs.unshift(j); - } else { - this.jobs.push(j); - } - } - this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); - this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); + this.retryJobItemSameMessage(job); } successCount += 1; } @@ -545,6 +528,19 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } }; + private retryJobItemSameMessage(job: JobQueueItemSameMessage): void { + // Create new jobs for each pubkey set, and Promise.all all the results + for (const j of jobItemSameMessageToMultiSet(job)) { + if (j.opts.priority) { + this.jobs.unshift(j); + } else { + this.jobs.push(j); + } + } + this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1); + this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length); + } + /** For testing */ private async waitTillInitialized(): Promise { await Promise.all( From 38006cf2c22e6d7645e7cdaafa7563e09778165d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 20 Jul 2023 16:29:03 +0200 Subject: [PATCH 10/10] Review PR --- .../beacon-node/src/chain/bls/maybeBatch.ts | 2 + .../src/chain/bls/multithread/index.ts | 68 +++++++++++-------- .../beacon-node/src/chain/bls/singleThread.ts | 1 - 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/packages/beacon-node/src/chain/bls/maybeBatch.ts b/packages/beacon-node/src/chain/bls/maybeBatch.ts index cb30f7008bb..619ddf4d72e 100644 --- a/packages/beacon-node/src/chain/bls/maybeBatch.ts +++ b/packages/beacon-node/src/chain/bls/maybeBatch.ts @@ -39,6 +39,8 @@ export function verifySignatureSetsMaybeBatch(sets: SignatureSetDeserialized[]): }); } catch (_) { // A signature could be malformed, in that case fromBytes throws error + // blst-ts `verifyMultipleSignatures` is also a fallible operation if mul_n_aggregate fails + // see https://github.com/ChainSafe/blst-ts/blob/b1ba6333f664b08e5c50b2b0d18c4f079203962b/src/lib.ts#L291 return false; } } diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index 680ca6da9b4..81990a97a01 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -391,12 +391,18 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { workReq = jobItemWorkReq(job, this.format); } catch (e) { this.metrics?.blsThreadPool.errorAggregateSignatureSetsCount.inc({type: job.type}); - if (job.type === JobQueueItemType.default) { - job.reject(e as Error); - } else { - // there could be an invalid pubkey/signature, retry each individually - this.retryJobItemSameMessage(job); + + switch (job.type) { + case JobQueueItemType.default: + job.reject(e as Error); + break; + + case JobQueueItemType.sameMessage: + // there could be an invalid pubkey/signature, retry each individually + this.retryJobItemSameMessage(job); + break; } + continue; } // Re-push all jobs with matching workReq for easier accounting of results @@ -437,30 +443,34 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { const jobResult = results[i]; const sigSetCount = jobItemSigSets(job); - if (job.type === JobQueueItemType.default) { - if (!jobResult || jobResult.code !== WorkResultCode.success) { - job.reject(getJobResultError(jobResult, i)); - errorCount += sigSetCount; - } else { - job.resolve(jobResult.result); - successCount += sigSetCount; - } - } - // handle result of the verification of aggregated signature against aggregated pubkeys - else if (job.type === JobQueueItemType.sameMessage) { - if (!jobResult || jobResult.code !== WorkResultCode.success) { - job.reject(getJobResultError(jobResult, i)); - errorCount += 1; - } else { - if (jobResult.result) { - // All are valid, most of the time it goes here - job.resolve(job.sets.map(() => true)); + // TODO: enable exhaustive switch case checks lint rule + switch (job.type) { + case JobQueueItemType.default: + if (!jobResult || jobResult.code !== WorkResultCode.success) { + job.reject(getJobResultError(jobResult, i)); + errorCount += sigSetCount; } else { - // Retry each individually - this.retryJobItemSameMessage(job); + job.resolve(jobResult.result); + successCount += sigSetCount; } - successCount += 1; - } + break; + + // handle result of the verification of aggregated signature against aggregated pubkeys + case JobQueueItemType.sameMessage: + if (!jobResult || jobResult.code !== WorkResultCode.success) { + job.reject(getJobResultError(jobResult, i)); + errorCount += 1; + } else { + if (jobResult.result) { + // All are valid, most of the time it goes here + job.resolve(job.sets.map(() => true)); + } else { + // Retry each individually + this.retryJobItemSameMessage(job); + } + successCount += 1; + } + break; } } @@ -478,7 +488,9 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { this.metrics?.blsThreadPool.batchSigsSuccess.inc(batchSigsSuccess); } catch (e) { // Worker communications should never reject - if (!this.closed) this.logger.error("BlsMultiThreadWorkerPool error", {}, e as Error); + if (!this.closed) { + this.logger.error("BlsMultiThreadWorkerPool error", {}, e as Error); + } // Reject all for (const job of jobsInput) { job.reject(e as Error); diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index c005f3a70f0..49eae40e3b8 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -32,7 +32,6 @@ export class BlsSingleThreadVerifier implements IBlsVerifier { const endNs = process.hrtime.bigint(); const totalSec = Number(startNs - endNs) / 1e9; this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.observe(totalSec); - this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.observe(totalSec / sets.length); return isValid; }