Skip to content

Commit

Permalink
Merge pull request #3273 from artilleryio/refactor/launcher
Browse files Browse the repository at this point in the history
refactor: make the Launcher abstraction less opinionated
  • Loading branch information
hassy authored Jul 25, 2024
2 parents 2d5f41a + e640b79 commit 43068a4
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 86 deletions.
87 changes: 16 additions & 71 deletions packages/artillery/lib/launch-platform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

const divideWork = require('./dist');
const { SSMS } = require('@artilleryio/int-core').ssms;
const { loadPlugins, loadPluginsConfig } = require('./load-plugins');

Expand All @@ -15,7 +14,6 @@ const _ = require('lodash');

const PlatformLocal = require('./platform/local');
const PlatformLambda = require('./platform/aws-lambda');
const STATES = require('./platform/worker-states');

async function createLauncher(script, payload, opts, launcherOpts) {
launcherOpts = launcherOpts || {
Expand All @@ -30,7 +28,7 @@ class Launcher {
this.payload = payload;
this.opts = opts;

this.workers = {};
this.exitedWorkersCount = 0;
this.workerMessageBuffer = [];

this.metricsByPeriod = {}; // individual intermediates by worker
Expand All @@ -47,25 +45,11 @@ class Launcher {
this.periodsReportedFor = [];

if (launcherOpts.platform === 'local') {
this.count = this.opts.count || Math.max(1, os.cpus().length - 1);
debug('Worker thread count:', this.count);
}

if (launcherOpts.platform === 'local') {
this.platform = new PlatformLocal(script, payload, opts);
} else {
// aws:lambda
this.platform = new PlatformLocal(script, payload, opts, launcherOpts);
} else if (launcherOpts.platform === 'aws:lambda') {
this.platform = new PlatformLambda(script, payload, opts, launcherOpts);
}

if (launcherOpts.mode === 'distribute') {
this.workerScripts = divideWork(this.script, this.count);
this.count = this.workerScripts.length;
} else {
this.count = this.launcherOpts.count;
this.workerScripts = new Array(this.count).fill().map((_) => this.script);
}

this.phaseStartedEventsSeen = {};
this.phaseCompletedEventsSeen = {};

Expand All @@ -76,9 +60,12 @@ class Launcher {

async initWorkerEvents(workerEvents) {
workerEvents.on('workerError', (workerId, message) => {
this.workers[workerId].state = STATES.stoppedError;

const { id, error, level, aggregatable, logs } = message;

if (level !== 'warn') {
this.exitedWorkersCount++;
}

if (aggregatable) {
this.workerMessageBuffer.push(message);
} else {
Expand Down Expand Up @@ -149,8 +136,7 @@ class Launcher {
});

workerEvents.on('done', async (workerId, message) => {
this.workers[workerId].state = STATES.completed;

this.exitedWorkersCount++;
this.finalReportsByWorker[workerId] = SSMS.deserializeMetrics(
message.report
);
Expand Down Expand Up @@ -238,13 +224,7 @@ class Launcher {

async handleAllWorkersFinished() {
const allWorkersDone =
Object.keys(this.workers).filter((workerId) => {
return (
this.workers[workerId].state === STATES.completed ||
this.workers[workerId].state === STATES.stoppedError
);
}).length === this.count;

this.exitedWorkersCount === this.platform.getDesiredWorkerCount();
if (allWorkersDone) {
clearInterval(this.i1);
clearInterval(this.i2);
Expand Down Expand Up @@ -341,11 +321,12 @@ class Launcher {

// Dynamically adjust the duration we're willing to wait for. This matters on SQS where messages are received
// in batches of 10 and more workers => need to wait longer.
const MAX_WAIT_FOR_PERIOD_MS = (Math.ceil(this.count / 10) * 3 + 30) * 1000;
const MAX_WAIT_FOR_PERIOD_MS =
(Math.ceil(this.platform.getDesiredWorkerCount() / 10) * 3 + 30) * 1000;

debug({
now: Date.now(),
count: this.count,
count: this.platform.getDesiredWorkerCount(),
earliestPeriodAvailable,
earliest,
MAX_WAIT_FOR_PERIOD_MS,
Expand All @@ -355,7 +336,8 @@ class Launcher {
});

const allWorkersReportedForPeriod =
this.metricsByPeriod[earliestPeriodAvailable]?.length === this.count;
this.metricsByPeriod[earliestPeriodAvailable]?.length ===
this.platform.getDesiredWorkerCount();
const waitedLongEnough =
Date.now() - Number(earliestPeriodAvailable) > MAX_WAIT_FOR_PERIOD_MS;

Expand Down Expand Up @@ -418,40 +400,8 @@ class Launcher {
await this.handleAllWorkersFinished();
}, 2 * 1000);

const contextVars = await this.platform.init();

// TODO: only makes sense for "distribute" / "local"
for (const script of this.workerScripts) {
const w1 = await this.platform.createWorker();

this.workers[w1.workerId] = {
id: w1.workerId,
script,
state: STATES.initializing
};
debug(`worker init ok: ${w1.workerId}`);
}

await this.initWorkerEvents(this.platform.events);

for (const [workerId, w] of Object.entries(this.workers)) {
await this.platform.prepareWorker(workerId, {
script: w.script,
payload: this.payload,
options: this.opts
});
this.workers[workerId].state = STATES.preparing;
}
debug('workers prepared');

// the initial context is stringified and copied to the workers
const contextVarsString = JSON.stringify(contextVars);

for (const [workerId, w] of Object.entries(this.workers)) {
await this.platform.runWorker(workerId, contextVarsString);
this.workers[workerId].state = STATES.initializing;
}

await this.platform.startJob();
debug('workers running');
}

Expand All @@ -474,11 +424,6 @@ class Launcher {
}
}
}

// Stop workers
for (const [id, w] of Object.entries(this.workers)) {
await this.platform.stopWorker(id);
}
}
}

Expand Down
16 changes: 14 additions & 2 deletions packages/artillery/lib/platform/aws-lambda/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,26 @@ class PlatformLambda {
});
}

getDesiredWorkerCount() {
return this.platformOpts.count;
}

async startJob() {
await this.init();

for (let i = 0; i < this.platformOpts.count; i++) {
const { workerId } = await this.createWorker();
this.workers[workerId] = { id: workerId };
await this.runWorker(workerId);
}
}

async createWorker() {
const workerId = randomUUID();

return { workerId };
}

async prepareWorker(workerId) {}

async runWorker(workerId) {
const lambda = new AWS.Lambda({
apiVersion: '2015-03-31',
Expand Down
71 changes: 59 additions & 12 deletions packages/artillery/lib/platform/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,76 @@ const { handleScriptHook, prepareScript, loadProcessor } =
const debug = require('debug')('platform:local');
const EventEmitter = require('events');
const _ = require('lodash');

const divideWork = require('../../dist');
const STATES = require('../worker-states');
const os = require('node:os');
class PlatformLocal {
constructor(script, payload, opts, platformOpts) {
// We need these to run before/after hooks:
this.script = script;
this.payload = payload;
this.opts = opts;
this.events = new EventEmitter(); // send worker events such as workerError, etc

this.platformOpts = platformOpts;
this.workers = {};

this.workerScripts = {};
this.count = Infinity;
return this;
}

getDesiredWorkerCount() {
return this.count;
}

async startJob() {
await this.init();

if (this.platformOpts.mode === 'distribute') {
const count = Math.max(1, os.cpus().length - 1);
this.workerScripts = divideWork(this.script, count);
this.count = this.workerScripts.length;
} else {
// --count may only be used when mode is "multiply"
this.count = this.platformOpts.count;
this.workerScripts = new Array(this.count).fill().map((_) => this.script);
}

for (const script of this.workerScripts) {
const w1 = await this.createWorker();

this.workers[w1.workerId] = {
id: w1.workerId,
script,
state: STATES.initializing,
proc: w1
};
debug(`worker init ok: ${w1.workerId}`);
}

for (const [workerId, w] of Object.entries(this.workers)) {
await this.prepareWorker(workerId, {
script: w.script,
payload: this.payload,
options: this.opts
});
this.workers[workerId].state = STATES.preparing;
}
debug('workers prepared');

// the initial context is stringified and copied to the workers
const contextVarsString = JSON.stringify(this.contextVars);

for (const [workerId, w] of Object.entries(this.workers)) {
await this.runWorker(workerId, contextVarsString);
this.workers[workerId].state = STATES.initializing;
}
}

async init() {
// 'before' hook is executed in the main thread,
// its context is then passed to the workers
const contextVars = await this.runHook('before');
this.contextVars = contextVars; // TODO: Rename to something more descriptive

return contextVars;
}

async createWorker() {
Expand Down Expand Up @@ -67,11 +116,6 @@ class PlatformLocal {
process.nextTick(() => process.exit(11));
});

this.workers[worker.workerId] = {
proc: worker,
state: worker.state // TODO: replace with getState() use
};

return worker;
}

Expand All @@ -84,16 +128,19 @@ class PlatformLocal {
debug('runWorker', workerId);
return this.workers[workerId].proc.run(contextVarsString);
}

async stopWorker(workerId) {
return this.workers[workerId].proc.stop();
}

async getWorkerState(workerId) {}

async shutdown() {
// 'after' hook is executed in the main thread, after all workers
// are done
await this.runHook('after', this.contextVars);

for (const [workerId, w] of Object.entries(this.workers)) {
await this.stopWorker(workerId);
}
}

// ********
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ config:
target: http://asciiart.artillery.io:8080
phases:
- arrivalRate: 1
duration: 10
duration: 12
processor: './processor.js'

scenarios:
Expand Down

0 comments on commit 43068a4

Please sign in to comment.