Skip to content

Commit

Permalink
fix(checker): retry on process crash (#3059)
Browse files Browse the repository at this point in the history
Retry checker processes when they crash unexpectedly.

* Add decorator pattern to checkers (like with test runners)
* Split "resource" related shared code in its own class
* Implement the `CheckerRetryDecorator`
* Add integration tests to validate that the behavior works correctly.
  • Loading branch information
nicojs authored Aug 7, 2021
1 parent 9630fc3 commit 8880643
Show file tree
Hide file tree
Showing 27 changed files with 517 additions and 184 deletions.
4 changes: 2 additions & 2 deletions packages/core/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"type": "node",
"request": "launch",
"name": "Unit tests",
"name": "👽 Unit tests",
"program": "${workspaceRoot}/../../node_modules/mocha/bin/_mocha",
"args": [
"--no-timeout",
Expand All @@ -22,7 +22,7 @@
{
"type": "node",
"request": "launch",
"name": "Integration tests",
"name": "👽 Integration tests",
"program": "${workspaceRoot}/../../node_modules/mocha/bin/_mocha",
"args": [
"--no-timeout",
Expand Down
34 changes: 34 additions & 0 deletions packages/core/src/checker/checker-child-process-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Checker, CheckResult, CheckStatus } from '@stryker-mutator/api/check';
import { Mutant, StrykerOptions } from '@stryker-mutator/api/core';
import { Disposable } from 'typed-inject';

import { ChildProcessProxy } from '../child-proxy/child-process-proxy';
import { LoggingClientContext } from '../logging';
import { Resource } from '../concurrent/pool';

import { CheckerWorker } from './checker-worker';

export class CheckerChildProcessProxy implements Checker, Disposable, Resource {
private readonly childProcess: ChildProcessProxy<CheckerWorker>;

constructor(options: StrykerOptions, loggingContext: LoggingClientContext) {
this.childProcess = ChildProcessProxy.create(require.resolve('./checker-worker'), loggingContext, options, {}, process.cwd(), CheckerWorker, []);
}

public async dispose(): Promise<void> {
await this.childProcess?.dispose();
}

public async init(): Promise<void> {
await this.childProcess?.proxy.init();
}

public async check(mutant: Mutant): Promise<CheckResult> {
if (this.childProcess) {
return this.childProcess.proxy.check(mutant);
}
return {
status: CheckStatus.Passed,
};
}
}
22 changes: 22 additions & 0 deletions packages/core/src/checker/checker-decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { CheckResult } from '@stryker-mutator/api/check';
import { Mutant } from '@stryker-mutator/api/core';

import { ChildProcessCrashedError } from '../child-proxy/child-process-crashed-error';
import { ResourceDecorator } from '../concurrent';

import { CheckerResource } from './checker-resource';

export class CheckerDecorator extends ResourceDecorator<CheckerResource> {
public async check(mutant: Mutant): Promise<CheckResult> {
try {
return await this.innerResource.check(mutant);
} catch (err) {
if (err instanceof ChildProcessCrashedError) {
await this.recover();
return this.innerResource.check(mutant);
} else {
throw err; //oops
}
}
}
}
57 changes: 0 additions & 57 deletions packages/core/src/checker/checker-facade.ts

This file was deleted.

19 changes: 19 additions & 0 deletions packages/core/src/checker/checker-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { StrykerOptions } from '@stryker-mutator/api/core';
import { LoggerFactoryMethod } from '@stryker-mutator/api/logging';
import { commonTokens, tokens } from '@stryker-mutator/api/plugin';

import { coreTokens } from '../di';
import { LoggingClientContext } from '../logging/logging-client-context';

import { CheckerChildProcessProxy } from './checker-child-process-proxy';
import { CheckerResource } from './checker-resource';
import { CheckerRetryDecorator } from './checker-retry-decorator';

createCheckerFactory.inject = tokens(commonTokens.options, coreTokens.loggingContext, commonTokens.getLogger);
export function createCheckerFactory(
options: StrykerOptions,
loggingContext: LoggingClientContext,
getLogger: LoggerFactoryMethod
): () => CheckerResource {
return () => new CheckerRetryDecorator(() => new CheckerChildProcessProxy(options, loggingContext), getLogger(CheckerRetryDecorator.name));
}
5 changes: 5 additions & 0 deletions packages/core/src/checker/checker-resource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Checker } from '@stryker-mutator/api/check';

import { Resource } from '../concurrent';

export type CheckerResource = Checker & Resource;
33 changes: 33 additions & 0 deletions packages/core/src/checker/checker-retry-decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { CheckResult } from '@stryker-mutator/api/check';
import { Mutant } from '@stryker-mutator/api/core';
import { Logger } from '@stryker-mutator/api/logging';

import { ChildProcessCrashedError } from '../child-proxy/child-process-crashed-error';
import { OutOfMemoryError } from '../child-proxy/out-of-memory-error';
import { ResourceDecorator } from '../concurrent';

import { CheckerResource } from './checker-resource';

export class CheckerRetryDecorator extends ResourceDecorator<CheckerResource> implements CheckerResource {
constructor(producer: () => CheckerResource, private readonly log: Logger) {
super(producer);
}

public async check(mutant: Mutant): Promise<CheckResult> {
try {
return await this.innerResource.check(mutant);
} catch (error) {
if (error instanceof ChildProcessCrashedError) {
if (error instanceof OutOfMemoryError) {
this.log.warn(`Checker process [${error.pid}] ran out of memory. Retrying in a new process.`);
} else {
this.log.warn(`Checker process [${error.pid}] crashed with exit code ${error.exitCode}. Retrying in a new process.`, error);
}
await this.recover();
return this.innerResource.check(mutant);
} else {
throw error; //oops
}
}
}
}
1 change: 1 addition & 0 deletions packages/core/src/checker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './checker-factory';
1 change: 1 addition & 0 deletions packages/core/src/concurrent/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './pool';
export * from './concurrency-token-provider';
export * from './resource-decorator';
74 changes: 41 additions & 33 deletions packages/core/src/concurrent/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,49 @@ import { coreTokens } from '../di';

const MAX_CONCURRENT_INIT = 2;

export interface Worker {
init?(): Promise<unknown>;
dispose?(): Promise<unknown>;
/**
* Represents a Checker that is also a Resource (with an init and dispose)
*/
export type CheckerResource = Checker & Resource;
/**
* Represents a TestRunner that is also a Resource (with an init and dispose)
*/
export type TestRunnerResource = Resource & TestRunner;

export interface Resource extends Partial<Disposable> {
init?(): Promise<void>;
}

createTestRunnerPool.inject = tokens(coreTokens.testRunnerFactory, coreTokens.testRunnerConcurrencyTokens);
export function createTestRunnerPool(factory: () => TestRunner, concurrencyToken$: Observable<number>): Pool<TestRunner> {
export function createTestRunnerPool(factory: () => TestRunnerResource, concurrencyToken$: Observable<number>): Pool<TestRunner> {
return new Pool(factory, concurrencyToken$);
}

createCheckerPool.inject = tokens(coreTokens.checkerFactory, coreTokens.checkerConcurrencyTokens);
export function createCheckerPool(factory: () => Checker, concurrencyToken$: Observable<number>): Pool<Checker> {
export function createCheckerPool(factory: () => CheckerResource, concurrencyToken$: Observable<number>): Pool<Checker> {
return new Pool(factory, concurrencyToken$);
}

/**
* Represents a pool of workers. Use `schedule` to schedule work to be executed on the workers.
* The pool will automatically recycle the workers, but will make sure only one task is executed
* on one worker at any one time. Creates as many workers as the concurrency tokens allow.
* Also takes care of the initialing of the workers (with `init()`)
* Represents a pool of resources. Use `schedule` to schedule work to be executed on the resources.
* The pool will automatically recycle the resources, but will make sure only one task is executed
* on one resource at any one time. Creates as many resources as the concurrency tokens allow.
* Also takes care of the initialing of the resources (with `init()`)
*/
export class Pool<TWorker extends Worker> implements Disposable {
private readonly createdWorkers: TWorker[] = [];
private readonly worker$: Observable<TWorker>;
export class Pool<TResource extends Resource> implements Disposable {
private readonly createdResources: TResource[] = [];
private readonly resource$: Observable<TResource>;

constructor(factory: () => TWorker, concurrencyToken$: Observable<number>) {
this.worker$ = concurrencyToken$.pipe(
constructor(factory: () => TResource, concurrencyToken$: Observable<number>) {
this.resource$ = concurrencyToken$.pipe(
mergeMap(async () => {
if (this.isDisposed) {
return null;
} else {
const worker = factory();
this.createdWorkers.push(worker);
await worker.init?.();
return worker;
const resource = factory();
this.createdResources.push(resource);
await resource.init?.();
return resource;
}
}, MAX_CONCURRENT_INIT),
filter(notEmpty),
Expand All @@ -54,27 +62,27 @@ export class Pool<TWorker extends Worker> implements Disposable {
}

/**
* Returns a promise that resolves if all concurrency tokens have resulted in initialized workers.
* This is optional, workers will get initialized either way.
* Returns a promise that resolves if all concurrency tokens have resulted in initialized resources.
* This is optional, resources will get initialized either way.
*/
public async init(): Promise<void> {
await lastValueFrom(this.worker$);
await lastValueFrom(this.resource$);
}

/**
* Schedules a task to be executed on workers in the pool. Each input is paired with a worker, which allows async work to be done.
* @param input$ The inputs to pair up with a worker.
* @param task The task to execute on each worker
* Schedules a task to be executed on resources in the pool. Each input is paired with a resource, which allows async work to be done.
* @param input$ The inputs to pair up with a resource.
* @param task The task to execute on each resource
*/
public schedule<TIn, TOut>(input$: Observable<TIn>, task: (worker: TWorker, input: TIn) => Promise<TOut> | TOut): Observable<TOut> {
const recycleBin = new Subject<TWorker>();
const worker$ = merge(recycleBin, this.worker$);
public schedule<TIn, TOut>(input$: Observable<TIn>, task: (resource: TResource, input: TIn) => Promise<TOut> | TOut): Observable<TOut> {
const recycleBin = new Subject<TResource>();
const resource$ = merge(recycleBin, this.resource$);

return zip(worker$, input$).pipe(
mergeMap(async ([worker, input]) => {
const output = await task(worker, input);
// Recycles a worker so its re-emitted from the `worker$` observable.
recycleBin.next(worker);
return zip(resource$, input$).pipe(
mergeMap(async ([resource, input]) => {
const output = await task(resource, input);
// Recycles a resource so its re-emitted from the `resource$` observable.
recycleBin.next(resource);
return output;
}),
tap({ complete: () => recycleBin.complete() })
Expand All @@ -88,6 +96,6 @@ export class Pool<TWorker extends Worker> implements Disposable {
*/
public async dispose(): Promise<void> {
this.isDisposed = true;
await Promise.all(this.createdWorkers.map((worker) => worker.dispose?.()));
await Promise.all(this.createdResources.map((resource) => resource.dispose?.()));
}
}
26 changes: 26 additions & 0 deletions packages/core/src/concurrent/resource-decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Resource } from './pool';

export abstract class ResourceDecorator<T extends Resource> implements Resource {
protected innerResource: T;

constructor(private readonly producer: () => T) {
this.innerResource = producer();
}

public async init(): Promise<void> {
await this.innerResource.init?.();
}

public async dispose(): Promise<void> {
await this.innerResource.dispose?.();
}
/**
* Disposes the current test runner and creates a new one
* To be used in decorators that need recreation.
*/
protected async recover(): Promise<void> {
await this.dispose();
this.innerResource = this.producer();
return this.init();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Sandbox } from '../sandbox/sandbox';
import { LoggingClientContext } from '../logging';

import { ConcurrencyTokenProvider, createCheckerPool } from '../concurrent';
import { createCheckerFactory } from '../checker/checker-facade';
import { createCheckerFactory } from '../checker';
import { createPreprocessor } from '../sandbox';

import { DryRunContext } from './3-dry-run-executor';
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/process/3-dry-run-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import { createTestRunnerFactory } from '../test-runner';
import { MutationTestReportHelper } from '../reporters/mutation-test-report-helper';
import { ConfigError } from '../errors';
import { findMutantTestCoverage } from '../mutants';
import { Pool, createTestRunnerPool } from '../concurrent/pool';
import { ConcurrencyTokenProvider } from '../concurrent';
import { ConcurrencyTokenProvider, Pool, createTestRunnerPool } from '../concurrent';
import { FileMatcher } from '../config';

import { MutationTestContext } from './4-mutation-test-executor';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const MAX_WAIT_FOR_DISPOSE = 2000;
/**
* Runs the given test runner in a child process and forwards reports about test results
*/
export class ChildProcessTestRunnerDecorator implements TestRunner {
export class ChildProcessTestRunnerProxy implements TestRunner {
private readonly worker: ChildProcessProxy<ChildProcessTestRunnerWorker>;

constructor(options: StrykerOptions, sandboxWorkingDirectory: string, loggingContext: LoggingClientContext, private readonly log: Logger) {
Expand Down
Loading

0 comments on commit 8880643

Please sign in to comment.