Skip to content

Commit

Permalink
Use higher job concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Apr 17, 2023
1 parent e430bda commit a42fddd
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import bls from "@chainsafe/bls";
import {LocalKeystoreDefinition} from "../interface.js";
import {clearKeystoreCache, loadKeystoreCache, writeKeystoreCache} from "../keystoreCache.js";
import {lockFilepath, unlockFilepath} from "../../../../util/lockfile.js";
import {defaultPoolSize} from "./poolSize.js";
import {calculateThreadpoolConcurrency} from "./poolSize.js";
import {DecryptKeystoreWorkerAPI, KeystoreDecryptOptions} from "./types.js";

/**
Expand Down Expand Up @@ -32,14 +32,18 @@ export async function decryptKeystoreDefinitions(
const passwords = new Array(keystoreDefinitions.length) as string[];
const tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
const errors: Error[] = [];
const {numWorkers, jobConcurrency} = calculateThreadpoolConcurrency();
const pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker("./worker.js"), {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
}),
defaultPoolSize
{
concurrency: jobConcurrency,
size: numWorkers,
}
);
for (const [index, definition] of keystoreDefinitions.entries()) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
let defaultPoolSize: number;
import os from "node:os";

try {
if (typeof navigator !== "undefined") {
defaultPoolSize = navigator.hardwareConcurrency ?? 4;
} else {
defaultPoolSize = (await import("node:os")).cpus().length;
}
} catch (e) {
defaultPoolSize = 8;
}
/**
* Amount of memory used to decrypt a single keystore
* calculated from https://github.com/ethereum/staking-deposit-cli/blob/d7b530442d6e0921c9db84b3a1cf6b3ecd6b9d35/staking_deposit/key_handling/keystore.py#L190-L195
*/
export const KEYSTORE_MEMORY_USAGE = 268435456;
/**
* Maximum amount of memory to use per thread
* conservatively, 2GB
*/
export const MAX_MEMORY_USAGE_PER_THREAD = 2147483648;

export const MAX_CONCURRENCY_PER_THREAD = Math.floor(MAX_MEMORY_USAGE_PER_THREAD / KEYSTORE_MEMORY_USAGE);

/**
* Cross-platform aprox number of logical cores
* Figure out what the best combination of workers and job concurrency is to best utilize available memory
*/
export {defaultPoolSize};
export function calculateThreadpoolConcurrency(): {numWorkers: number; jobConcurrency: number} {
const defaultPoolSize = os.cpus().length;
// Don't eat all available memory
const freeMem = os.freemem() * 0.8;
let numWorkers = defaultPoolSize;
let jobConcurrency = 1;
for (let i = defaultPoolSize; i > 0; i--) {
const iConcurrency = Math.floor(freeMem / i / KEYSTORE_MEMORY_USAGE);
if (iConcurrency <= MAX_CONCURRENCY_PER_THREAD && iConcurrency > jobConcurrency) {
numWorkers = i;
jobConcurrency = iConcurrency;
}
}
return {
numWorkers,
jobConcurrency,
};
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import rimraf from "rimraf";
import {expect} from "chai";
Expand All @@ -7,6 +8,12 @@ import {getKeystoresStr} from "../../utils/keystores.js";
import {testFilesDir} from "../../utils.js";
import {decryptKeystoreDefinitions} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions/index.js";
import {LocalKeystoreDefinition} from "../../../src/cmds/validator/keymanager/interface.js";
import {
KEYSTORE_MEMORY_USAGE,
MAX_CONCURRENCY_PER_THREAD,
MAX_MEMORY_USAGE_PER_THREAD,
calculateThreadpoolConcurrency,
} from "../../../src/cmds/validator/keymanager/decryptKeystoreDefinitions/poolSize.js";

describe("decryptKeystoreDefinitions", function () {
this.timeout(100_000);
Expand Down Expand Up @@ -65,4 +72,12 @@ describe("decryptKeystoreDefinitions", function () {

await decryptKeystoreDefinitions(definitions, {logger: console, ignoreLockFile: true});
});

it("calculateThreadpoolConcurrency sanity test", () => {
const {numWorkers, jobConcurrency} = calculateThreadpoolConcurrency();
const freeMem = os.freemem() * 0.8;
expect(jobConcurrency).to.be.lte(MAX_CONCURRENCY_PER_THREAD);
expect(jobConcurrency * KEYSTORE_MEMORY_USAGE).to.be.lte(MAX_MEMORY_USAGE_PER_THREAD);
expect(numWorkers * jobConcurrency * KEYSTORE_MEMORY_USAGE).to.be.lte(freeMem);
});
});

0 comments on commit a42fddd

Please sign in to comment.