Skip to content

Commit

Permalink
Reworks the worker pool to be more configurable (#5764)
Browse files Browse the repository at this point in the history
**What's the problem this PR addresses?**

This PR extends @anuragkalia's work in #5636 (it starts from the same
commits and builds upon them).

- The original implementation was keeping a singleton worker pool - I
felt this was problematic since we may enter situations (even if only in
tests) where multiple calls to `convertToZip` would want to use
different pools.

- Converting an archive without using a worker was only possible when
the concurrency was completely disabled. This was my idea, but
retrospectively it felt better to me to have two settings: one for the
concurrency, and another to define whether the concurrency is enabled or
not.

- Passing all those settings from the various fetchers would have been
unwieldly.

**How did you fix it?**

- I created a `TaskPool` interface; `WorkerPool` implements it (using
workers, same implementation as before), but so does `AsyncPool` (a new
implementation that simply forwards the call to the `pLimit` limiter).

- I feel like this also addresses @merceyz's comment regarding the
setting name - it's now called `taskPoolConcurrency`, which doesn't
directly refers to workers.

- A `getConfigurationWorker` function returns the proper task pool for
the given configuration object. To make that possible, WeakMap instances
can now be used as storage argument for the `get*WithDefault` family of
functions.

**Checklist**
<!--- Don't worry if you miss something, chores are automatically
tested. -->
<!--- This checklist exists to help you remember doing the chores when
you submit a PR. -->
<!--- Put an `x` in all the boxes that apply. -->
- [x] I have read the [Contributing
Guide](https://yarnpkg.com/advanced/contributing).

<!-- See
https://yarnpkg.com/advanced/contributing#preparing-your-pr-to-be-released
for more details. -->
<!-- Check with `yarn version check` and fix with `yarn version check
-i` -->
- [x] I have set the packages that need to be released for my changes to
be effective.

<!-- The "Testing chores" workflow validates that your PR follows our
guidelines. -->
<!-- If it doesn't pass, click on it to see details as to what your PR
might be missing. -->
- [x] I will check that all automated PR checks pass before the PR gets
reviewed.

---------

Co-authored-by: Anurag Kalia <[email protected]>
  • Loading branch information
arcanis and akalia-atlassian authored Oct 2, 2023
1 parent ade06c5 commit fdfc784
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 50 deletions.
34 changes: 34 additions & 0 deletions .yarn/versions/85334687.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
releases:
"@yarnpkg/cli": minor
"@yarnpkg/core": minor
"@yarnpkg/plugin-file": minor
"@yarnpkg/plugin-git": minor
"@yarnpkg/plugin-github": minor
"@yarnpkg/plugin-http": minor
"@yarnpkg/plugin-npm": minor

declined:
- "@yarnpkg/plugin-compat"
- "@yarnpkg/plugin-constraints"
- "@yarnpkg/plugin-dlx"
- "@yarnpkg/plugin-essentials"
- "@yarnpkg/plugin-exec"
- "@yarnpkg/plugin-init"
- "@yarnpkg/plugin-interactive-tools"
- "@yarnpkg/plugin-link"
- "@yarnpkg/plugin-nm"
- "@yarnpkg/plugin-npm-cli"
- "@yarnpkg/plugin-pack"
- "@yarnpkg/plugin-patch"
- "@yarnpkg/plugin-pnp"
- "@yarnpkg/plugin-pnpm"
- "@yarnpkg/plugin-stage"
- "@yarnpkg/plugin-typescript"
- "@yarnpkg/plugin-version"
- "@yarnpkg/plugin-workspace-tools"
- "@yarnpkg/builder"
- "@yarnpkg/doctor"
- "@yarnpkg/extensions"
- "@yarnpkg/nm"
- "@yarnpkg/pnpify"
- "@yarnpkg/sdks"
15 changes: 15 additions & 0 deletions packages/docusaurus/static/configuration/yarnrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,21 @@
}
}
},
"taskPoolConcurrency": {
"_package": "@yarnpkg/core",
"title": "Maximal amount of concurrent heavy task processing.",
"description": "We default to the platform parallelism, but for some CI, `os.cpus` may not report accurate values and may overwhelm their containers.",
"type": "number",
"default": "os.availableParallelism()"
},
"workerPoolMode": {
"_package": "@yarnpkg/core",
"title": "Execution strategy for heavy tasks.",
"description": "By default will use workers when performing heavy tasks, such as converting tgz files to zip. This setting can be used to disable workers and use a regular in-thread async processing.",
"type": "string",
"enum": ["async", "workers"],
"default": "workers"
},
"telemetryInterval": {
"_package": "@yarnpkg/core",
"title": "Define the minimal amount of time between two telemetry events, in days.",
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-file/sources/TarballFileFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class TarballFileFetcher implements Fetcher {
const sourceBuffer = await fileUtils.fetchArchiveFromLocator(locator, opts);

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-git/sources/GitFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class GitFetcher implements Fetcher {

return await miscUtils.releaseAfterUseAsync(async () => {
return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-github/sources/GithubFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class GithubFetcher implements Fetcher {
const packedBuffer = await xfs.readFilePromise(packagePath);

return await tgzUtils.convertToZip(packedBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-http/sources/TarballHttpFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class TarballHttpFetcher implements Fetcher {
});

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-npm/sources/NpmHttpFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class NpmHttpFetcher implements Fetcher {
});

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/plugin-npm/sources/NpmSemverFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class NpmSemverFetcher implements Fetcher {
}

return await tgzUtils.convertToZip(sourceBuffer, {
compressionLevel: opts.project.configuration.get(`compressionLevel`),
configuration: opts.project.configuration,
prefixPath: structUtils.getIdentVendorPath(locator),
stripComponents: 1,
});
Expand Down
13 changes: 13 additions & 0 deletions packages/yarnpkg-core/sources/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ export const coreDefinitions: {[coreSettingName: string]: SettingsDefinition} =
type: SettingsType.NUMBER,
default: 50,
},
taskPoolConcurrency: {
description: `Maximal amount of concurrent heavy task processing`,
type: SettingsType.NUMBER,
default: nodeUtils.availableParallelism(),
},
taskPoolMode: {
description: `Execution strategy for heavy tasks`,
type: SettingsType.STRING,
values: [`async`, `workers`],
default: `workers`,
},
networkSettings: {
description: `Network settings per hostname (glob patterns are supported)`,
type: SettingsType.MAP,
Expand Down Expand Up @@ -643,6 +654,8 @@ export interface ConfigurationValueMap {
httpsKeyFilePath: PortablePath | null;
httpsCertFilePath: PortablePath | null;
enableStrictSsl: boolean;
taskPoolConcurrency: number;
taskPoolMode: string;

logFilters: Array<miscUtils.ToMapValue<{code?: string, text?: string, pattern?: string, level?: formatUtils.LogLevel | null}>>;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
import PLimit from 'p-limit';
import {Worker} from 'worker_threads';

import * as nodeUtils from './nodeUtils';
import pLimit, {Limit} from 'p-limit';
import {Worker} from 'worker_threads';

const kTaskInfo = Symbol(`kTaskInfo`);

type PoolWorker<TOut> = Worker & {
[kTaskInfo]: null | { resolve: (value: TOut) => void, reject: (reason?: any) => void };
};

export class WorkerPool<TIn, TOut> {
export interface TaskPool<TIn, TOut> {
run(data: TIn): Promise<TOut>;
}

export class AsyncPool<TIn, TOut> implements TaskPool<TIn, TOut> {
private limit: Limit;

constructor(private fn: (data: TIn) => Promise<TOut>, opts: {poolSize: number}) {
this.limit = pLimit(opts.poolSize);
}

run(data: TIn) {
return this.limit(() => this.fn(data));
}
}

export class WorkerPool<TIn, TOut> implements TaskPool<TIn, TOut> {
private workers: Array<PoolWorker<TOut>> = [];
private limit = PLimit(nodeUtils.availableParallelism());

private cleanupInterval: ReturnType<typeof setInterval>;

constructor(private source: string) {
private limit: Limit;

constructor(private source: string, opts: {poolSize: number}) {
this.limit = pLimit(opts.poolSize);

this.cleanupInterval = setInterval(() => {
if (this.limit.pendingCount === 0 && this.limit.activeCount === 0) {
// Start terminating one worker at a time when there are no tasks left.
Expand Down
13 changes: 9 additions & 4 deletions packages/yarnpkg-core/sources/miscUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ export function convertMapsToIndexableObjects<T>(arg: T): MapValueToObjectValue<
return arg as MapValueToObjectValue<T>;
}

export function getFactoryWithDefault<K, T>(map: Map<K, T>, key: K, factory: () => T) {
export interface GetSetMap<K, V> {
get(k: K): V | undefined;
set(k: K, v: V): void;
}

export function getFactoryWithDefault<K, T>(map: GetSetMap<K, T>, key: K, factory: () => T) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -129,7 +134,7 @@ export function getFactoryWithDefault<K, T>(map: Map<K, T>, key: K, factory: ()
return value;
}

export function getArrayWithDefault<K, T>(map: Map<K, Array<T>>, key: K) {
export function getArrayWithDefault<K, T>(map: GetSetMap<K, Array<T>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -138,7 +143,7 @@ export function getArrayWithDefault<K, T>(map: Map<K, Array<T>>, key: K) {
return value;
}

export function getSetWithDefault<K, T>(map: Map<K, Set<T>>, key: K) {
export function getSetWithDefault<K, T>(map: GetSetMap<K, Set<T>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand All @@ -147,7 +152,7 @@ export function getSetWithDefault<K, T>(map: Map<K, Set<T>>, key: K) {
return value;
}

export function getMapWithDefault<K, MK, MV>(map: Map<K, Map<MK, MV>>, key: K) {
export function getMapWithDefault<K, MK, MV>(map: GetSetMap<K, Map<MK, MV>>, key: K) {
let value = map.get(key);

if (typeof value === `undefined`)
Expand Down
108 changes: 95 additions & 13 deletions packages/yarnpkg-core/sources/tgzUtils.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,84 @@
import {FakeFS, PortablePath, NodeFS, ppath, xfs, npath, constants} from '@yarnpkg/fslib';
import {ZipCompression, ZipFS} from '@yarnpkg/libzip';
import {PassThrough, Readable} from 'stream';
import tar from 'tar';
import {Configuration, nodeUtils} from '@yarnpkg/core';
import {FakeFS, PortablePath, NodeFS, ppath, xfs, npath, constants, statUtils} from '@yarnpkg/fslib';
import {ZipCompression, ZipFS} from '@yarnpkg/libzip';
import {PassThrough, Readable} from 'stream';
import tar from 'tar';

import {AsyncPool, TaskPool, WorkerPool} from './TaskPool';
import * as miscUtils from './miscUtils';
import {getContent as getZipWorkerSource} from './worker-zip';

export type ConvertToZipPayload = {
tmpFile: PortablePath;
tgz: Buffer | Uint8Array;
extractBufferOpts: ExtractBufferOptions;
compressionLevel: ZipCompression;
};

export type ZipWorkerPool = TaskPool<ConvertToZipPayload, PortablePath>;

function createTaskPool(poolMode: string, poolSize: number): ZipWorkerPool {
switch (poolMode) {
case `async`:
return new AsyncPool(convertToZipWorker, {poolSize});

case `workers`:
return new WorkerPool(getZipWorkerSource(), {poolSize});

default: {
throw new Error(`Assertion failed: Unknown value ${poolMode} for taskPoolMode`);
}
}
}

let defaultWorkerPool: ZipWorkerPool | undefined;

export function getDefaultTaskPool() {
if (typeof defaultWorkerPool === `undefined`)
defaultWorkerPool = createTaskPool(`workers`, nodeUtils.availableParallelism());

return defaultWorkerPool;
}

import {WorkerPool} from './WorkerPool';
import * as miscUtils from './miscUtils';
import {getContent as getZipWorkerSource, ConvertToZipPayload} from './worker-zip';
const workerPools = new WeakMap<Configuration, ZipWorkerPool>();

interface MakeArchiveFromDirectoryOptions {
export function getTaskPoolForConfiguration(configuration: Configuration | void): ZipWorkerPool {
if (typeof configuration === `undefined`)
return getDefaultTaskPool();

return miscUtils.getFactoryWithDefault(workerPools, configuration, () => {
const poolMode = configuration.get(`taskPoolMode`);
const poolSize = configuration.get(`taskPoolConcurrency`);

switch (poolMode) {
case `async`:
return new AsyncPool(convertToZipWorker, {poolSize});

case `workers`:
return new WorkerPool(getZipWorkerSource(), {poolSize});

default: {
throw new Error(`Assertion failed: Unknown value ${poolMode} for taskPoolMode`);
}
}
});
}

export async function convertToZipWorker(data: ConvertToZipPayload) {
const {tmpFile, tgz, compressionLevel, extractBufferOpts} = data;

const zipFs = new ZipFS(tmpFile, {create: true, level: compressionLevel, stats: statUtils.makeDefaultStats()});

// Buffers sent through Node are turned into regular Uint8Arrays
const tgzBuffer = Buffer.from(tgz.buffer, tgz.byteOffset, tgz.byteLength);
await extractArchiveTo(tgzBuffer, zipFs, extractBufferOpts);

zipFs.saveAndClose();

return tmpFile;
}

export interface MakeArchiveFromDirectoryOptions {
baseFs?: FakeFS<PortablePath>;
prefixPath?: PortablePath | null;
compressionLevel?: ZipCompression;
Expand All @@ -32,20 +103,31 @@ export async function makeArchiveFromDirectory(source: PortablePath, {baseFs = n
}

export interface ExtractBufferOptions {
compressionLevel?: ZipCompression;
prefixPath?: PortablePath;
stripComponents?: number;
}

let workerPool: WorkerPool<ConvertToZipPayload, PortablePath> | null;
export interface ConvertToZipOptions extends ExtractBufferOptions {
configuration?: Configuration;
compressionLevel?: ZipCompression;
taskPool?: ZipWorkerPool;
}

export async function convertToZip(tgz: Buffer, opts: ExtractBufferOptions) {
export async function convertToZip(tgz: Buffer, opts: ConvertToZipOptions = {}) {
const tmpFolder = await xfs.mktempPromise();
const tmpFile = ppath.join(tmpFolder, `archive.zip`);

workerPool ||= new WorkerPool(getZipWorkerSource());
const compressionLevel = opts.compressionLevel
?? opts.configuration?.get(`compressionLevel`)
?? `mixed`;

const extractBufferOpts: ExtractBufferOptions = {
prefixPath: opts.prefixPath,
stripComponents: opts.stripComponents,
};

await workerPool.run({tmpFile, tgz, opts});
const taskPool = opts.taskPool ?? getTaskPoolForConfiguration(opts.configuration);
await taskPool.run({tmpFile, tgz, compressionLevel, extractBufferOpts});

return new ZipFS(tmpFile, {level: opts.compressionLevel});
}
Expand Down
20 changes: 3 additions & 17 deletions packages/yarnpkg-core/sources/worker-zip/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
import {PortablePath, statUtils} from '@yarnpkg/fslib';
import {ZipFS} from '@yarnpkg/libzip';
import {parentPort} from 'worker_threads';
import {parentPort} from 'worker_threads';

import {extractArchiveTo, ExtractBufferOptions} from '../tgzUtils';
import {convertToZipWorker, ConvertToZipPayload} from '../tgzUtils';

export type ConvertToZipPayload = {tmpFile: PortablePath, tgz: Buffer | Uint8Array, opts: ExtractBufferOptions};

if (!parentPort)
throw new Error(`Assertion failed: Expected parentPort to be set`);

parentPort.on(`message`, async (data: ConvertToZipPayload) => {
const {opts, tgz, tmpFile} = data;
const {compressionLevel, ...bufferOpts} = opts;

const zipFs = new ZipFS(tmpFile, {create: true, level: compressionLevel, stats: statUtils.makeDefaultStats()});

// Buffers sent through Node are turned into regular Uint8Arrays
const tgzBuffer = Buffer.from(tgz.buffer, tgz.byteOffset, tgz.byteLength);
await extractArchiveTo(tgzBuffer, zipFs, bufferOpts);

zipFs.saveAndClose();

parentPort!.postMessage(data.tmpFile);
parentPort!.postMessage(await convertToZipWorker(data));
});
1 change: 0 additions & 1 deletion packages/yarnpkg-core/sources/worker-zip/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export function getContent(): string;
export type {ConvertToZipPayload} from './Worker';
2 changes: 1 addition & 1 deletion packages/yarnpkg-core/sources/worker-zip/index.js

Large diffs are not rendered by default.

Loading

0 comments on commit fdfc784

Please sign in to comment.