From f0d58f3d02bedf90493d2d9edd4de18554d673f7 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 22 Nov 2019 17:42:36 +0200 Subject: [PATCH 1/3] bulkhead --- examples/bulkhead.ts | 26 ++ lib/bulkhead/Bulkhead/Bulkhead.ts | 57 +++++ lib/bulkhead/Bulkhead/factory.ts | 18 ++ .../BulkheadOptions.ts} | 19 +- .../BulkheadProvider/BulkheadProvider.ts | 5 + .../BulkheadProvider/ClassBulkheadProvider.ts | 21 ++ .../InstanceBulkheadProvider.ts | 23 ++ lib/bulkhead/BulkheadProvider/factory.ts | 27 ++ lib/bulkhead/ExecutionQueue/ExecutionQueue.ts | 29 +++ lib/bulkhead/ExecutionQueue/factory.ts | 14 ++ lib/bulkhead/index.ts | 44 ++++ lib/bulkhead/types.ts | 6 + lib/interfaces/factory.ts | 3 + lib/utils/index.ts | 22 ++ test/bulkhead/Bulkhead/Bulkhead.spec.ts | 180 ++++++++++++++ test/bulkhead/Bulkhead/factory.spec.ts | 40 +++ .../ClassBulkheadProvider.spec.ts | 57 +++++ .../InstanceBulkheadProvider.spec.ts | 64 +++++ .../bulkhead/BulkheadProvider/factory.spec.ts | 49 ++++ .../ExecutionQueue/ExecutionQueue.spec.ts | 73 ++++++ test/bulkhead/ExecutionQueue/factory.spec.ts | 31 +++ test/bulkhead/bulkhead.spec.ts | 233 ++++++++++++++++++ test/utils.ts | 4 +- test/utils/raiseStrategy.spec.ts | 81 ++++++ 24 files changed, 1113 insertions(+), 13 deletions(-) create mode 100644 examples/bulkhead.ts create mode 100644 lib/bulkhead/Bulkhead/Bulkhead.ts create mode 100644 lib/bulkhead/Bulkhead/factory.ts rename lib/{bulkhead.ts => bulkhead/BulkheadOptions.ts} (60%) create mode 100644 lib/bulkhead/BulkheadProvider/BulkheadProvider.ts create mode 100644 lib/bulkhead/BulkheadProvider/ClassBulkheadProvider.ts create mode 100644 lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider.ts create mode 100644 lib/bulkhead/BulkheadProvider/factory.ts create mode 100644 lib/bulkhead/ExecutionQueue/ExecutionQueue.ts create mode 100644 lib/bulkhead/ExecutionQueue/factory.ts create mode 100644 lib/bulkhead/index.ts create mode 100644 lib/bulkhead/types.ts create mode 100644 lib/interfaces/factory.ts create mode 100644 lib/utils/index.ts create mode 100644 test/bulkhead/Bulkhead/Bulkhead.spec.ts create mode 100644 test/bulkhead/Bulkhead/factory.spec.ts create mode 100644 test/bulkhead/BulkheadProvider/ClassBulkheadProvider.spec.ts create mode 100644 test/bulkhead/BulkheadProvider/InstanceBulkheadProvider.spec.ts create mode 100644 test/bulkhead/BulkheadProvider/factory.spec.ts create mode 100644 test/bulkhead/ExecutionQueue/ExecutionQueue.spec.ts create mode 100644 test/bulkhead/ExecutionQueue/factory.spec.ts create mode 100644 test/bulkhead/bulkhead.spec.ts create mode 100644 test/utils/raiseStrategy.spec.ts diff --git a/examples/bulkhead.ts b/examples/bulkhead.ts new file mode 100644 index 0000000..e56d27e --- /dev/null +++ b/examples/bulkhead.ts @@ -0,0 +1,26 @@ +import { bulkhead } from '../lib'; + +class Service { + + @bulkhead(2, { size: 1 }) + public async get(value: number) { + return new Promise(resolve => setTimeout( + () => resolve(value + 1), + 100, + )); + } + +} + +const instance = new Service(); + +instance.get(1) // start execution immediately + .then(result => console.log(`call 1 result: ${result}`)); +instance.get(2) // start execution immediately + .then(result => console.log(`call 2 result: ${result}`)); + +instance.get(3) // start execution after one of first 2 executions ends + .then(result => console.log(`call 3 result: ${result}`)); + +instance.get(4) // throws because are executed to much calls and queue limit is reached + .catch(() => console.log('call 4 fails')); diff --git a/lib/bulkhead/Bulkhead/Bulkhead.ts b/lib/bulkhead/Bulkhead/Bulkhead.ts new file mode 100644 index 0000000..c114ec1 --- /dev/null +++ b/lib/bulkhead/Bulkhead/Bulkhead.ts @@ -0,0 +1,57 @@ +import { ExecutionQueue } from '../ExecutionQueue/ExecutionQueue'; + +export class Bulkhead { + + private inExecution: number = 0; + + constructor( + private readonly threshold: number, + private readonly executionQueue: ExecutionQueue, + ) { } + + public async run(method: (...args: any[]) => Promise, args: any[]): Promise { + if (this.inExecution < this.threshold) { + return this.execute(method, args); + } + + return this.addToQueue(method, args); + } + + private async execute(method: (...args: any[]) => Promise, args: any[]) { + this.inExecution += 1; + + const result = method(...args); + + const afterExecution = async () => { + this.inExecution -= 1; + this.callNext(); + }; + + result.then(afterExecution, afterExecution); + + return result; + } + + private async addToQueue(method: (...args: any[]) => Promise, args: any[]) { + return new Promise( + (resolve, reject) => this.executionQueue.store({ method, args, resolve, reject }), + ); + } + + private async callNext() { + const next = this.executionQueue.next(); + if (!next) { + return; + } + + const { method, args, resolve, reject } = next; + + try { + const result = await this.execute(method, args); + resolve(result); + } catch (error) { + reject(error); + } + } + +} diff --git a/lib/bulkhead/Bulkhead/factory.ts b/lib/bulkhead/Bulkhead/factory.ts new file mode 100644 index 0000000..cd8b687 --- /dev/null +++ b/lib/bulkhead/Bulkhead/factory.ts @@ -0,0 +1,18 @@ +import { Factory } from '../../interfaces/factory'; +import { ExecutionQueueFactory } from '../ExecutionQueue/factory'; +import { Bulkhead } from './Bulkhead'; + +export class BulkheadFactory implements Factory> { + + constructor( + private readonly threshold: number, + private readonly executionQueueFactory: ExecutionQueueFactory, + ) { } + + public create(): Bulkhead { + const executionQueue = this.executionQueueFactory.create(); + + return new Bulkhead(this.threshold, executionQueue); + } + +} diff --git a/lib/bulkhead.ts b/lib/bulkhead/BulkheadOptions.ts similarity index 60% rename from lib/bulkhead.ts rename to lib/bulkhead/BulkheadOptions.ts index a39b177..50345f9 100644 --- a/lib/bulkhead.ts +++ b/lib/bulkhead/BulkheadOptions.ts @@ -7,14 +7,14 @@ export type BulkheadOptions = { scope?: 'class' | 'instance', /** - * The max size of the execution queue. By default not limited. + * The max size of the pending queue. By default not limited. */ size?: number, /** * Sets the behavior of handling the case when queue limit is reached. - * When `reject` (default) then returns a rejected promise with error - * `Limiter queue limit reached.`. + * When `throw` (default) then throws immediately with an error. + * When `reject` then returns a rejected promise with error * When `throw` then throws immediately with an error. * When `ignore` then doesn't throw any error and immediately * terminates execution (returns undefined). @@ -24,11 +24,8 @@ export type BulkheadOptions = { onError?: 'throw' | 'reject' | 'ignore' | 'ignoreAsync', }; -/** - * Limits the number of queued concurrent executions of a method. - * When the limit is reached the execution is delayed and queued. - * @param threshold the max number of concurrent executions. - */ -export function bulkhead(threshold: number) { - throw new Error('Not implemented.'); -} +export const DEFAULT_OPTIONS: Readonly = { + scope: 'class', + onError: 'throw', + size: undefined, +}; diff --git a/lib/bulkhead/BulkheadProvider/BulkheadProvider.ts b/lib/bulkhead/BulkheadProvider/BulkheadProvider.ts new file mode 100644 index 0000000..8d03f3a --- /dev/null +++ b/lib/bulkhead/BulkheadProvider/BulkheadProvider.ts @@ -0,0 +1,5 @@ +import { Bulkhead } from '../Bulkhead/Bulkhead'; + +export interface BulkheadProvider { + get(instance: any): Bulkhead; +} diff --git a/lib/bulkhead/BulkheadProvider/ClassBulkheadProvider.ts b/lib/bulkhead/BulkheadProvider/ClassBulkheadProvider.ts new file mode 100644 index 0000000..0bd3506 --- /dev/null +++ b/lib/bulkhead/BulkheadProvider/ClassBulkheadProvider.ts @@ -0,0 +1,21 @@ +import { Bulkhead } from '../Bulkhead/Bulkhead'; +import { BulkheadFactory } from '../Bulkhead/factory'; +import { BulkheadProvider } from './BulkheadProvider'; + +export class ClassBulkheadProvider implements BulkheadProvider { + + private bulkhead: Bulkhead = null; + + constructor( + private readonly bulkheadFactory: BulkheadFactory, + ) { } + + public get(): Bulkhead { + if (!this.bulkhead) { + this.bulkhead = this.bulkheadFactory.create(); + } + + return this.bulkhead; + } + +} diff --git a/lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider.ts b/lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider.ts new file mode 100644 index 0000000..bea2e5b --- /dev/null +++ b/lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider.ts @@ -0,0 +1,23 @@ +import { Bulkhead } from '../Bulkhead/Bulkhead'; +import { BulkheadFactory } from '../Bulkhead/factory'; +import { BulkheadProvider } from './BulkheadProvider'; + +export class InstanceBulkheadProvider implements BulkheadProvider { + + private readonly instancesBulkeads = new WeakMap>(); + + constructor( + private readonly bulkheadFactory: BulkheadFactory, + ) { } + + public get(instance: any): Bulkhead { + const hasBulkhead = this.instancesBulkeads.has(instance); + if (!hasBulkhead) { + const bulkheadService = this.bulkheadFactory.create(); + this.instancesBulkeads.set(instance, bulkheadService); + } + + return this.instancesBulkeads.get(instance); + } + +} diff --git a/lib/bulkhead/BulkheadProvider/factory.ts b/lib/bulkhead/BulkheadProvider/factory.ts new file mode 100644 index 0000000..fb1b76f --- /dev/null +++ b/lib/bulkhead/BulkheadProvider/factory.ts @@ -0,0 +1,27 @@ +import { Factory } from '../../interfaces/factory'; +import { BulkheadFactory } from '../Bulkhead/factory'; +import { BulkheadProvider } from './BulkheadProvider'; +import { ClassBulkheadProvider } from './ClassBulkheadProvider'; +import { InstanceBulkheadProvider } from './InstanceBulkheadProvider'; + +export class BulkheadProviderFactory + implements Factory, ['class' | 'instance']> { + + constructor( + private readonly bulkheadFactory: BulkheadFactory, + ) { } + + public create(scope: 'class' | 'instance'): BulkheadProvider { + switch (scope) { + case 'class': + return new ClassBulkheadProvider(this.bulkheadFactory); + + case 'instance': + return new InstanceBulkheadProvider(this.bulkheadFactory); + + default: + throw new Error(`@bulkhead unsuported scope type: ${scope}.`); + } + } + +} diff --git a/lib/bulkhead/ExecutionQueue/ExecutionQueue.ts b/lib/bulkhead/ExecutionQueue/ExecutionQueue.ts new file mode 100644 index 0000000..38e9e49 --- /dev/null +++ b/lib/bulkhead/ExecutionQueue/ExecutionQueue.ts @@ -0,0 +1,29 @@ +import { ExecutionMetaData } from '../types'; + +export class ExecutionQueue { + + private readonly queue: ExecutionMetaData[] = []; + + constructor( + private readonly limit: number = Infinity, + ) { } + + public store(data: ExecutionMetaData): this { + this.checkLimit(); + + this.queue.push(data); + + return this; + } + + public next(): ExecutionMetaData { + return this.queue.shift(); + } + + private checkLimit() { + if (this.queue.length >= this.limit) { + throw new Error('@bulkhead execution queue limit reached.'); + } + } + +} diff --git a/lib/bulkhead/ExecutionQueue/factory.ts b/lib/bulkhead/ExecutionQueue/factory.ts new file mode 100644 index 0000000..504bbb9 --- /dev/null +++ b/lib/bulkhead/ExecutionQueue/factory.ts @@ -0,0 +1,14 @@ +import { Factory } from '../../interfaces/factory'; +import { ExecutionQueue } from './ExecutionQueue'; + +export class ExecutionQueueFactory implements Factory> { + + constructor( + private readonly limit: number, + ) { } + + public create(): ExecutionQueue { + return new ExecutionQueue(this.limit); + } + +} diff --git a/lib/bulkhead/index.ts b/lib/bulkhead/index.ts new file mode 100644 index 0000000..d00a00c --- /dev/null +++ b/lib/bulkhead/index.ts @@ -0,0 +1,44 @@ +import { BulkheadOptions, DEFAULT_OPTIONS } from './BulkheadOptions'; +import { ExecutionQueueFactory } from './ExecutionQueue/factory'; +import { BulkheadFactory } from './Bulkhead/factory'; +import { BulkheadProviderFactory } from './BulkheadProvider/factory'; +import { BulkheadProvider } from './BulkheadProvider/BulkheadProvider'; +import { raiseStrategy } from '../utils'; + +export { BulkheadOptions }; + +/** + * Limits the number of queued concurrent executions of a method. + * When the limit is reached the execution is delayed and queued. + * @param threshold the max number of concurrent executions. + */ +export function bulkhead(threshold: number, options: BulkheadOptions): MethodDecorator { + const bulkheadProvided = createBulkheadProvider(threshold, options); + const raise = raiseStrategy(options, 'reject'); + + return function (_: any, __: any, descriptor: PropertyDescriptor) { + const method = descriptor.value; + + descriptor.value = async function (...args) { + const bulkhead = bulkheadProvided.get(this); + + try { + return await bulkhead.run(method.bind(this), args); + } catch (error) { + return raise(error); + } + }; + + return descriptor; + }; +} + +function createBulkheadProvider( + threshold: number, + { size = undefined, scope = 'class' }: BulkheadOptions = DEFAULT_OPTIONS, +): BulkheadProvider { + + const executionQueueFactory = new ExecutionQueueFactory(size); + const bulkheadFactory = new BulkheadFactory(threshold, executionQueueFactory); + return new BulkheadProviderFactory(bulkheadFactory).create(scope); +} diff --git a/lib/bulkhead/types.ts b/lib/bulkhead/types.ts new file mode 100644 index 0000000..707f636 --- /dev/null +++ b/lib/bulkhead/types.ts @@ -0,0 +1,6 @@ +export interface ExecutionMetaData { + args: any[]; + method: (...args: any[]) => Promise; + resolve: (value: T) => void; + reject: (error?: Error) => void; +} diff --git a/lib/interfaces/factory.ts b/lib/interfaces/factory.ts new file mode 100644 index 0000000..d885f6f --- /dev/null +++ b/lib/interfaces/factory.ts @@ -0,0 +1,3 @@ +export interface Factory { + create(...args: Args): T; +} diff --git a/lib/utils/index.ts b/lib/utils/index.ts new file mode 100644 index 0000000..1b74309 --- /dev/null +++ b/lib/utils/index.ts @@ -0,0 +1,22 @@ +type RaiseStrategies = 'throw' | 'reject' | 'ignore' | 'ignoreAsync'; + +type StrategyOptions = { + onError?: RaiseStrategies; +}; + +export function raiseStrategy(options: StrategyOptions, defaultStrategy: RaiseStrategies) { + const value = options && options.onError || defaultStrategy; + + switch (value) { + case 'reject': + return err => Promise.reject(err); + case 'throw': + return (err) => { throw err; }; + case 'ignore': + return () => { }; + case 'ignoreAsync': + return () => Promise.resolve(); + default: + throw new Error(`Option ${value} is not supported for 'behavior'.`); + } +} diff --git a/test/bulkhead/Bulkhead/Bulkhead.spec.ts b/test/bulkhead/Bulkhead/Bulkhead.spec.ts new file mode 100644 index 0000000..83299cd --- /dev/null +++ b/test/bulkhead/Bulkhead/Bulkhead.spec.ts @@ -0,0 +1,180 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { Bulkhead } from '../../../lib/bulkhead/Bulkhead/Bulkhead'; +import { repeat, delay } from '../../utils'; +import { ExecutionQueue } from '../../../lib/bulkhead/ExecutionQueue/ExecutionQueue'; + +describe('@bulkhead Bulkhead', () => { + + const threshold = 2; + const args = [1, {}, '123']; + let method: sinon.SinonStub>; + let executionQueueStub: sinon.SinonStubbedInstance; + let service: Bulkhead; + + beforeEach(() => { + executionQueueStub = sinon.createStubInstance(ExecutionQueue); + executionQueueStub.store.returns(executionQueueStub as any); + + method = sinon.stub<[], Promise>().returns(delay(10).then(() => 42)); + + service = new Bulkhead(threshold, executionQueueStub as any); + }); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(Bulkhead)); + + }); + + describe('run', () => { + + describe('if threshold is not reached yet', () => { + + it('should call method instantly', () => { + service.run(method, args); + + expect(method.calledOnce).to.be.true; + }); + + it('should call method with correct arguments', () => { + service.run(method, args); + + expect(method.calledWith(...args)).to.be.true; + }); + + it('should increase number of current executions', () => { + repeat(() => service.run(method, args), threshold); + + method.reset(); + + service.run(method, args); + + expect(method.called).to.be.false; + }); + + it('should decrease number of current executions after method resolves', async () => { + const promises = repeat(() => service.run(method, args), threshold); + await Promise.race(promises); + + method.reset(); + method.resolves(42); + + service.run(method, args); + + expect(method.calledOnce).to.be.true; + }); + + it('should return same value as method', async () => { + expect(await service.run(method, args)).to.equals(42); + }); + + it('should check if in queue are another calls', async () => { + await service.run(method, args); + + expect(executionQueueStub.next.calledOnce).to.be.true; + }); + + it('shoould begin execute next call if exists in queue', async () => { + const promises = [service.run(method, args), service.run(method, args)]; + + const stub = sinon.stub>().returns(delay(10).then(() => 42)); + let callIndex = 0; + executionQueueStub.next.callsFake(() => { + if (callIndex === 0) { + callIndex += 1; + return { + args, + method: stub, + reject: () => { }, + resolve: () => { }, + }; + } + return null; + }); + service.run(stub, args); + + await Promise.race(promises); + + expect(stub.calledOnce).to.be.true; + }); + + it('should begin execute next call if exists in queue with correct arguments', async () => { + const promises = [service.run(method, args), service.run(method, args)]; + + const stub = sinon.stub>().returns(delay(10).then(() => 42)); + let callIndex = 0; + executionQueueStub.next.callsFake(() => { + if (callIndex === 0) { + callIndex += 1; + return { + args, + method: stub, + reject: () => { }, + resolve: () => { }, + }; + } + return null; + }); + service.run(stub, args); + + await Promise.race(promises); + + expect(stub.calledWith(...args)).to.be.true; + }); + + }); + + describe('if threshold is reached', () => { + + beforeEach(() => { + repeat(() => service.run(method, args), threshold); + + executionQueueStub.store.callsFake((data) => { + let shouldCall = true; + + executionQueueStub.next.callsFake(() => { + if (shouldCall) { + shouldCall = false; + return data; + } + }); + + return executionQueueStub as any; + }); + }); + + it('should call ExecutionQueue.store to store current execution data', () => { + service.run(method, args); + + expect(executionQueueStub.store.calledOnce).to.be.true; + }); + + it('should call ExecutionQueue.store with correct arguments', () => { + service.run(method, args); + + const expectedArguments = { + args, + method, + resolve: sinon.match.func as any, + reject: sinon.match.func as any, + }; + expect(executionQueueStub.store.calledWithMatch(expectedArguments)).to.be.true; + }); + + it('should return correct result', async () => { + const stub = sinon.stub().resolves(42); + expect(await service.run(stub, args)).to.equals(42); + }); + + it('should reject if method will reject', async () => { + const stub = sinon.stub().rejects(new Error('error')); + await expect(service.run(stub, args)).to.be.rejectedWith('error'); + }); + + }); + + }); + +}); diff --git a/test/bulkhead/Bulkhead/factory.spec.ts b/test/bulkhead/Bulkhead/factory.spec.ts new file mode 100644 index 0000000..a96dd9c --- /dev/null +++ b/test/bulkhead/Bulkhead/factory.spec.ts @@ -0,0 +1,40 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { Bulkhead } from '../../../lib/bulkhead/Bulkhead/Bulkhead'; +import { BulkheadFactory } from '../../../lib/bulkhead/Bulkhead/factory'; +import { ExecutionQueueFactory } from '../../../lib/bulkhead/ExecutionQueue/factory'; + +describe('@bulkhead BulkheadFactory', () => { + + const threshold = 3; + let executionQueueFactoryStub: sinon.SinonStubbedInstance; + let service: BulkheadFactory; + + beforeEach(() => { + executionQueueFactoryStub = sinon.createStubInstance(ExecutionQueueFactory); + + service = new BulkheadFactory(threshold, executionQueueFactoryStub as any); + }); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(BulkheadFactory)); + + }); + + describe('create', () => { + + it('should call executionQueueFactory.create to create instance of Execution Queue', () => { + service.create(); + + expect(executionQueueFactoryStub.create.calledOnce).to.be.true; + }); + + it('should return instance of bulkhead', () => { + expect(service.create()).to.be.instanceOf(Bulkhead); + }); + + }); + +}); diff --git a/test/bulkhead/BulkheadProvider/ClassBulkheadProvider.spec.ts b/test/bulkhead/BulkheadProvider/ClassBulkheadProvider.spec.ts new file mode 100644 index 0000000..b6a6145 --- /dev/null +++ b/test/bulkhead/BulkheadProvider/ClassBulkheadProvider.spec.ts @@ -0,0 +1,57 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + ClassBulkheadProvider, +} from '../../../lib/bulkhead/BulkheadProvider/ClassBulkheadProvider'; +import { BulkheadFactory } from '../../../lib/bulkhead/Bulkhead/factory'; + +describe('@bulkhead BulkheadProvider', () => { + + let bulkheadFactoryStub: sinon.SinonStubbedInstance; + let service: ClassBulkheadProvider; + + beforeEach(() => { + bulkheadFactoryStub = sinon.createStubInstance(BulkheadFactory); + + service = new ClassBulkheadProvider(bulkheadFactoryStub as any); + }); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(ClassBulkheadProvider)); + + }); + + describe('get', () => { + + it('it should call BulkheadFactory.create to create Bulkhead instance', () => { + service.get(); + + expect(bulkheadFactoryStub.create.calledOnce).to.be.true; + }); + + it('it should call BulkheadFactory.create only at first call', () => { + bulkheadFactoryStub.create.returns({} as any); + service.get(); + bulkheadFactoryStub.create.reset(); + + service.get(); + + expect(bulkheadFactoryStub.create.called).to.be.false; + }); + + it('should return every time same instance', () => { + expect(service.get()).to.equals(service.get()); + }); + + it('should return created result from BulkheadFactory.create', () => { + const expected = {} as any; + bulkheadFactoryStub.create.returns(expected); + + expect(service.get()).to.equals(expected); + }); + + }); + +}); diff --git a/test/bulkhead/BulkheadProvider/InstanceBulkheadProvider.spec.ts b/test/bulkhead/BulkheadProvider/InstanceBulkheadProvider.spec.ts new file mode 100644 index 0000000..2f11c41 --- /dev/null +++ b/test/bulkhead/BulkheadProvider/InstanceBulkheadProvider.spec.ts @@ -0,0 +1,64 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { BulkheadFactory } from '../../../lib/bulkhead/Bulkhead/factory'; +import { + InstanceBulkheadProvider, +} from '../../../lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider'; + +describe('@bulkhead InstanceBulkheadProvider', () => { + + let bulkheadFactoryStub: sinon.SinonStubbedInstance; + let service: InstanceBulkheadProvider; + + beforeEach(() => { + bulkheadFactoryStub = sinon.createStubInstance(BulkheadFactory); + + service = new InstanceBulkheadProvider(bulkheadFactoryStub as any); + }); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(InstanceBulkheadProvider)); + + }); + + describe('get', () => { + + it('should create new bulkhead instance if was called first time with this instance', () => { + const result = {} as any; + const instance = {} as any; + bulkheadFactoryStub.create.returns(result); + + expect(service.get(instance)).to.equals(result); + }); + + it('should return already created bulkhead for current isntance', () => { + const result = {} as any; + bulkheadFactoryStub.create.returns(result); + const instance = {} as any; + service.get(instance); + + expect(service.get(instance)).to.equals(result); + }); + + it('should create new bulkhead with bulkheadFactory.create', () => { + service.get({} as any); + + expect(bulkheadFactoryStub.create.calledOnce).to.be.true; + }); + + it('should not create new bulkhead instance if already exists', () => { + bulkheadFactoryStub.create.returns({} as any); + const instance = {} as any; + service.get(instance); + bulkheadFactoryStub.create.reset(); + + service.get(instance); + + expect(bulkheadFactoryStub.create.called).to.be.false; + }); + + }); + +}); diff --git a/test/bulkhead/BulkheadProvider/factory.spec.ts b/test/bulkhead/BulkheadProvider/factory.spec.ts new file mode 100644 index 0000000..90e8146 --- /dev/null +++ b/test/bulkhead/BulkheadProvider/factory.spec.ts @@ -0,0 +1,49 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { BulkheadFactory } from '../../../lib/bulkhead/Bulkhead/factory'; +import { + ClassBulkheadProvider, +} from '../../../lib/bulkhead/BulkheadProvider/ClassBulkheadProvider'; +import { BulkheadProviderFactory } from '../../../lib/bulkhead/BulkheadProvider/factory'; +import { + InstanceBulkheadProvider, +} from '../../../lib/bulkhead/BulkheadProvider/InstanceBulkheadProvider'; + +describe('@bulkhead BulkheadProviderFactory', () => { + + let bulkheadFactoryStub: sinon.SinonStubbedInstance; + let service: BulkheadProviderFactory; + + beforeEach(() => { + bulkheadFactoryStub = sinon.createStubInstance(BulkheadFactory); + + service = new BulkheadProviderFactory(bulkheadFactoryStub as any); + }); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(BulkheadProviderFactory)); + + }); + + describe('create', () => { + + it('should create instanceof ClassBulkheadProvider if scope is "class"', () => { + expect(service.create('class')).to.be.instanceOf(ClassBulkheadProvider); + }); + + it('should create instanceof InstanceBulkheadProvider if scope is "instance"', () => { + expect(service.create('instance')).to.be.instanceOf(InstanceBulkheadProvider); + }); + + it('should throw error if scope options is not a valid one', () => { + const scope = '123' as any; + const message = `@bulkhead unsuported scope type: ${scope}.`; + + expect(() => service.create(scope)).to.throw(message); + }); + + }); + +}); diff --git a/test/bulkhead/ExecutionQueue/ExecutionQueue.spec.ts b/test/bulkhead/ExecutionQueue/ExecutionQueue.spec.ts new file mode 100644 index 0000000..b6c9798 --- /dev/null +++ b/test/bulkhead/ExecutionQueue/ExecutionQueue.spec.ts @@ -0,0 +1,73 @@ +import { expect } from 'chai'; + +import { ExecutionQueue } from '../../../lib/bulkhead/ExecutionQueue/ExecutionQueue'; +import { repeat } from '../../utils'; +import { ExecutionMetaData } from '../../../lib/bulkhead/types'; + +describe('@bulkhead ExecutionQueue', () => { + + const limit = 3; + const resolve = () => { }; + const reject = () => { }; + const args = []; + const method = () => Promise.resolve(); + let service: ExecutionQueue; + + beforeEach(() => service = new ExecutionQueue(limit)); + + describe('constructor', () => { + + it('should create', () => expect(new ExecutionQueue()).to.be.instanceOf(ExecutionQueue)); + + }); + + describe('store', () => { + + it('should store given data', () => { + service.store({ method, args, resolve, reject }); + + expect(service.next()).to.be.deep.equals({ method, args, resolve, reject }); + }); + + it('should throw if limit is reached', () => { + repeat(() => service.store(null), limit); + + expect(() => service.store(null)).to.throw(); + }); + + it('should return self instance', () => { + expect(service.store(null)).to.equals(service); + }); + + }); + + describe('next', () => { + + it('should return undefined if not stored data', () => { + expect(service.next()).to.equals(undefined); + }); + + it('should return data in reverse order of storing', () => { + const firstExecutionData: ExecutionMetaData = { method, args, resolve, reject }; + const secondExecutionData: ExecutionMetaData = { method, args, resolve, reject }; + + service.store(firstExecutionData); + service.store(secondExecutionData); + + expect(service.next()).to.equals(firstExecutionData); + }); + + it('should remove data when return it', () => { + const firstExecutionData: ExecutionMetaData = { method, args, resolve, reject }; + const secondExecutionData: ExecutionMetaData = { method, args, resolve, reject }; + + service.store(firstExecutionData); + service.store(secondExecutionData); + service.next(); + + expect(service.next()).to.equals(secondExecutionData); + }); + + }); + +}); diff --git a/test/bulkhead/ExecutionQueue/factory.spec.ts b/test/bulkhead/ExecutionQueue/factory.spec.ts new file mode 100644 index 0000000..677b0b9 --- /dev/null +++ b/test/bulkhead/ExecutionQueue/factory.spec.ts @@ -0,0 +1,31 @@ +import { expect } from 'chai'; + +import { ExecutionQueue } from '../../../lib/bulkhead/ExecutionQueue/ExecutionQueue'; +import { ExecutionQueueFactory } from '../../../lib/bulkhead/ExecutionQueue/factory'; + +describe('@bulkhead ExecutionQueueFactory', () => { + + const limit = 3; + let service: ExecutionQueueFactory; + + beforeEach(() => service = new ExecutionQueueFactory(limit)); + + describe('constructor', () => { + + it('should create', () => expect(service).to.be.instanceOf(ExecutionQueueFactory)); + + }); + + describe('create', () => { + + it('should return instance of Execution Queue', () => { + expect(service.create()).to.be.instanceOf(ExecutionQueue); + }); + + it('should return every time antother instance of Execution Queue', () => { + expect(service.create()).not.to.equals(service.create()); + }); + + }); + +}); diff --git a/test/bulkhead/bulkhead.spec.ts b/test/bulkhead/bulkhead.spec.ts new file mode 100644 index 0000000..ba419a4 --- /dev/null +++ b/test/bulkhead/bulkhead.spec.ts @@ -0,0 +1,233 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { BulkheadOptions, bulkhead } from '../../lib'; +import { delay, repeat } from '../utils'; + +describe('@bulkhead', () => { + + const threshold = 2; + const timeout = 10; + let stub: sinon.SinonStub<[], any>; + + function factory(options?: BulkheadOptions, startStub = stub) { + + class Test { + + @bulkhead(threshold, options) + async get(): Promise { + startStub(); + + await delay(timeout); + + return 42; + } + + } + + return Test; + } + + beforeEach(() => stub = sinon.stub()); + + it('should not change method behavoiur', async () => { + const instance = new (factory())(); + + expect(await instance.get()).to.equals(42); + }); + + it('should not change method behaviour if was called from queue', async () => { + const instance = new (factory())(); + + repeat(() => instance.get(), threshold); + + expect(await instance.get()).to.equals(42); + }); + + describe('options', () => { + + it('should not throw without options', async () => { + const instance = new (factory())(); + + await expect(instance.get()).not.be.rejected; + }); + + it('should not throw for given correct options', async () => { + const instance = new (factory({ size: 3, onError: 'ignore', scope: 'class' }))(); + + await expect(instance.get()).not.be.rejected; + }); + + it('should throw for wrong onError option', () => { + const onError = 'wrong option' as any; + const options: BulkheadOptions = { onError }; + + const expectedMessage = `Option ${onError} is not supported for 'behavior'.`; + expect(() => new (factory(options))().get()).to.throw(expectedMessage); + }); + + it('should throw for wrong scope option', () => { + const scope = 'wrong option' as any; + const options: BulkheadOptions = { scope }; + + const expectedMessage = `@bulkhead unsuported scope type: ${scope}.`; + expect(() => new (factory(options))().get()).to.throw(expectedMessage); + }); + + }); + + describe('options behavoiur', () => { + + describe('threshold', () => { + + let instance: InstanceType>; + + beforeEach(() => instance = new (factory())()); + + it('should execute instantly threshold calls times', () => { + repeat(() => instance.get(), threshold + 1); + + expect(stub.callCount).to.equals(threshold); + }); + + it('should execute next calls after one of current calls ends its execution', async () => { + const promises = repeat(() => instance.get(), threshold); + instance.get(); + + stub.reset(); + await Promise.race(promises); + + expect(stub.calledOnce).to.be.true; + }); + + it('should execute calls instantly if nothing is executed now', async () => { + const promises = repeat(() => instance.get(), threshold); + await Promise.all(promises); + + stub.reset(); + + repeat(() => instance.get(), threshold); + expect(stub.callCount).to.equals(threshold); + }); + + }); + + describe('size', () => { + + const size = 2; + const options: BulkheadOptions = { size }; + let instance: InstanceType>; + + beforeEach(() => instance = new (factory(options))()); + + it('should store in queue maximum size calls', async () => { + const promises = repeat(() => instance.get(), threshold); + repeat(() => instance.get(), size); + + stub.reset(); + await Promise.all(promises); + + expect(stub.callCount).to.equals(size); + }); + + it('should throw if is tring to call more than threshold + size', () => { + repeat(() => instance.get(), threshold + size); + + expect(instance.get()).to.be.rejectedWith('@bulkhead execution queue limit reached.'); + }); + + }); + + describe('onError', () => { + + const error = 'an error'; + + function factory(options?: BulkheadOptions) { + + class Test { + + @bulkhead(threshold, options) + async get(): Promise { + throw new Error(error); + } + + } + + return Test; + } + + it('should reject if onError is "reject"', async () => { + const instance = new (factory({ onError: 'reject' }))(); + + await expect(instance.get()).to.be.rejectedWith(error); + }); + + it('should ignore if onError is "ignoreAsync"', async () => { + const instance = new (factory({ onError: 'ignoreAsync' }))(); + + expect(instance.get()).to.be.a('promise').and.not.to.be.rejected; + }); + + }); + + describe('scope', () => { + + describe('class', () => { + + const options: BulkheadOptions = { scope: 'class' }; + let constructor: ReturnType; + let firstInstance: InstanceType; + let secondInstance: InstanceType; + + beforeEach(() => { + constructor = factory(options); + [firstInstance, secondInstance] = [new constructor(), new constructor()]; + }); + + it('should store in queue calls after threshold for same instances', () => { + repeat(() => firstInstance.get(), threshold + 1); + + expect(stub.callCount).to.equals(threshold); + }); + + it('should store in queue calls after threshold for different instances', () => { + repeat(() => firstInstance.get(), threshold); + repeat(() => secondInstance.get(), threshold); + + expect(stub.callCount).to.equals(threshold); + }); + + }); + + describe('instance', () => { + + const options: BulkheadOptions = { scope: 'instance' }; + let constructor: ReturnType; + let firstInstance: InstanceType; + let secondInstance: InstanceType; + + beforeEach(() => { + constructor = factory(options); + [firstInstance, secondInstance] = [new constructor(), new constructor()]; + }); + + it('should store in queue calls after threshold for same instances', () => { + repeat(() => firstInstance.get(), threshold + 1); + + expect(stub.callCount).to.equals(threshold); + }); + + it('should not store in queue calls after threshold for different instances', () => { + repeat(() => firstInstance.get(), threshold); + repeat(() => secondInstance.get(), threshold); + + expect(stub.callCount).to.equals(threshold * 2); + }); + + }); + + }); + + }); + +}); diff --git a/test/utils.ts b/test/utils.ts index 850f631..4753836 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -2,8 +2,8 @@ export function delay(ms: number) { return new Promise(res => setTimeout(res, ms)); } -export function repeat(func: () => void, count: number) { - const results = []; +export function repeat(func: () => T, count: number): T[] { + const results: T[] = []; for (let i = 0; i < count; i += 1) { results.push(func()); diff --git a/test/utils/raiseStrategy.spec.ts b/test/utils/raiseStrategy.spec.ts new file mode 100644 index 0000000..28cf185 --- /dev/null +++ b/test/utils/raiseStrategy.spec.ts @@ -0,0 +1,81 @@ +import { expect } from 'chai'; + +import { raiseStrategy } from '../../lib/utils'; + +describe('raiseStrategy', () => { + + it('should throw for wrong strategy option', () => { + const onError = 'wrong option' as any; + const expectedMessage = `Option ${onError} is not supported for 'behavior'.`; + expect(() => raiseStrategy({ onError }, onError)).to.throw(expectedMessage); + }); + + describe('reject', () => { + + const onError = 'reject'; + + let strategy: ReturnType; + + beforeEach(() => strategy = raiseStrategy({ onError }, onError)); + + it('should return an function', () => expect(strategy).to.be.a('function')); + + it('returned function should ', async () => { + const message = 'error message'; + await expect(strategy(new Error(message))).to.be.rejectedWith(message); + }); + + }); + + describe('throw', () => { + + const onError = 'throw'; + + let strategy: ReturnType; + + beforeEach(() => strategy = raiseStrategy({ onError }, onError)); + + it('should return an function', () => expect(strategy).to.be.a('function')); + + it('returned function should ', () => { + const message = 'error message'; + expect(() => strategy(new Error(message))).to.throw(message); + }); + + }); + + describe('ignore', () => { + + const onError = 'ignore'; + + let strategy: ReturnType; + + beforeEach(() => strategy = raiseStrategy({ onError }, onError)); + + it('should return an function', () => expect(strategy).to.be.a('function')); + + it('returned function should ', () => { + const message = 'error message'; + expect(strategy(new Error(message))).to.be.undefined; + }); + + }); + + describe('ignoreAsync', () => { + + const onError = 'ignoreAsync'; + + let strategy: ReturnType; + + beforeEach(() => strategy = raiseStrategy({ onError }, onError)); + + it('should return an function', () => expect(strategy).to.be.a('function')); + + it('returned function should ', () => { + const message = 'error message'; + expect(strategy(new Error(message))).to.be.a('promise').and.not.be.rejected; + }); + + }); + +}); From 14722b7ab816660f60675cafaba8f69649501cae Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 22 Nov 2019 18:31:10 +0200 Subject: [PATCH 2/3] bulkhead update options --- lib/bulkhead/BulkheadOptions.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/bulkhead/BulkheadOptions.ts b/lib/bulkhead/BulkheadOptions.ts index 50345f9..02fb8be 100644 --- a/lib/bulkhead/BulkheadOptions.ts +++ b/lib/bulkhead/BulkheadOptions.ts @@ -13,19 +13,15 @@ export type BulkheadOptions = { /** * Sets the behavior of handling the case when queue limit is reached. - * When `throw` (default) then throws immediately with an error. - * When `reject` then returns a rejected promise with error - * When `throw` then throws immediately with an error. - * When `ignore` then doesn't throw any error and immediately - * terminates execution (returns undefined). + * When `reject` (default) then returns a rejected promise with error * When `ignoreAsync` then doesn't throw any error and immediately * returns a resolved promise. */ - onError?: 'throw' | 'reject' | 'ignore' | 'ignoreAsync', + onError?: 'reject' | 'ignoreAsync', }; export const DEFAULT_OPTIONS: Readonly = { scope: 'class', - onError: 'throw', + onError: 'reject', size: undefined, }; From 95b777576016bbbc656a9d28c603e202b84780d7 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 23 Nov 2019 13:57:30 +0200 Subject: [PATCH 3/3] bulkhead update unit tests --- test/bulkhead/bulkhead.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/bulkhead/bulkhead.spec.ts b/test/bulkhead/bulkhead.spec.ts index ba419a4..7fb80f8 100644 --- a/test/bulkhead/bulkhead.spec.ts +++ b/test/bulkhead/bulkhead.spec.ts @@ -53,7 +53,7 @@ describe('@bulkhead', () => { }); it('should not throw for given correct options', async () => { - const instance = new (factory({ size: 3, onError: 'ignore', scope: 'class' }))(); + const instance = new (factory({ size: 3, onError: 'ignoreAsync', scope: 'class' }))(); await expect(instance.get()).not.be.rejected; });