-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bulkhead #7
base: develop
Are you sure you want to change the base?
bulkhead #7
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import { bulkhead } from '../lib'; | ||
|
||
class Service { | ||
|
||
@bulkhead(2, { size: 1 }) | ||
public async get(value: number) { | ||
return new Promise(resolve => setTimeout( | ||
() => resolve(value + 1), | ||
100, | ||
)); | ||
} | ||
|
||
} | ||
|
||
const instance = new Service(); | ||
|
||
instance.get(1) // start execution immediately | ||
.then(result => console.log(`call 1 result: ${result}`)); | ||
instance.get(2) // start execution immediately | ||
.then(result => console.log(`call 2 result: ${result}`)); | ||
|
||
instance.get(3) // start execution after one of first 2 executions ends | ||
.then(result => console.log(`call 3 result: ${result}`)); | ||
|
||
instance.get(4) // throws because are executed to much calls and queue limit is reached | ||
.catch(() => console.log('call 4 fails')); |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import { ExecutionQueue } from '../ExecutionQueue/ExecutionQueue'; | ||
|
||
export class Bulkhead<T = any> { | ||
|
||
private inExecution: number = 0; | ||
|
||
constructor( | ||
private readonly threshold: number, | ||
private readonly executionQueue: ExecutionQueue<T>, | ||
) { } | ||
|
||
public async run(method: (...args: any[]) => Promise<T>, args: any[]): Promise<T> { | ||
if (this.inExecution < this.threshold) { | ||
return this.execute(method, args); | ||
} | ||
|
||
return this.addToQueue(method, args); | ||
} | ||
|
||
private async execute(method: (...args: any[]) => Promise<T>, args: any[]) { | ||
this.inExecution += 1; | ||
|
||
const result = method(...args); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The provided |
||
|
||
const afterExecution = async () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if |
||
this.inExecution -= 1; | ||
this.callNext(); | ||
}; | ||
|
||
result.then(afterExecution, afterExecution); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the method never resolves? It will stay forever in the queue and potentially might block the queue. I think we might need an additional option to setup a timeout for the execution. Could we use the |
||
|
||
return result; | ||
} | ||
|
||
private async addToQueue(method: (...args: any[]) => Promise<T>, args: any[]) { | ||
return new Promise<T>( | ||
(resolve, reject) => this.executionQueue.store({ method, args, resolve, reject }), | ||
); | ||
} | ||
|
||
private async callNext() { | ||
const next = this.executionQueue.next(); | ||
if (!next) { | ||
return; | ||
} | ||
|
||
const { method, args, resolve, reject } = next; | ||
|
||
try { | ||
const result = await this.execute(method, args); | ||
resolve(result); | ||
} catch (error) { | ||
reject(error); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import { Factory } from '../../interfaces/factory'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as for queue implementation, this factory is not of much use taking into account there won't be another bulkhead implementations. Now it looks like an added complexity. I'd suggest that we implement a static create method directly in the bulkhead class. |
||
import { ExecutionQueueFactory } from '../ExecutionQueue/factory'; | ||
import { Bulkhead } from './Bulkhead'; | ||
|
||
export class BulkheadFactory<T = any> implements Factory<Bulkhead<T>> { | ||
|
||
constructor( | ||
private readonly threshold: number, | ||
private readonly executionQueueFactory: ExecutionQueueFactory<T>, | ||
) { } | ||
|
||
public create(): Bulkhead<T> { | ||
const executionQueue = this.executionQueueFactory.create(); | ||
|
||
return new Bulkhead(this.threshold, executionQueue); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
export type BulkheadOptions = { | ||
/** | ||
* The scope of limiter. | ||
* The `class` (default) scope defines a single method scope for all class instances. | ||
* The `instance` scope defines a per-instance method scope. | ||
*/ | ||
scope?: 'class' | 'instance', | ||
|
||
/** | ||
* The max size of the pending queue. By default not limited. | ||
*/ | ||
size?: number, | ||
|
||
/** | ||
* Sets the behavior of handling the case when queue limit is reached. | ||
* When `reject` (default) then returns a rejected promise with error | ||
* When `ignoreAsync` then doesn't throw any error and immediately | ||
* returns a resolved promise. | ||
*/ | ||
onError?: 'reject' | 'ignoreAsync', | ||
}; | ||
|
||
export const DEFAULT_OPTIONS: Readonly<BulkheadOptions> = { | ||
scope: 'class', | ||
onError: 'reject', | ||
size: undefined, | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
import { Bulkhead } from '../Bulkhead/Bulkhead'; | ||
|
||
export interface BulkheadProvider<T = any> { | ||
get(instance: any): Bulkhead<T>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import { Bulkhead } from '../Bulkhead/Bulkhead'; | ||
import { BulkheadFactory } from '../Bulkhead/factory'; | ||
import { BulkheadProvider } from './BulkheadProvider'; | ||
|
||
export class ClassBulkheadProvider<T = any> implements BulkheadProvider<T> { | ||
|
||
private bulkhead: Bulkhead<T> = null; | ||
|
||
constructor( | ||
private readonly bulkheadFactory: BulkheadFactory<T>, | ||
) { } | ||
|
||
public get(): Bulkhead<T> { | ||
if (!this.bulkhead) { | ||
this.bulkhead = this.bulkheadFactory.create(); | ||
} | ||
|
||
return this.bulkhead; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import { Bulkhead } from '../Bulkhead/Bulkhead'; | ||
import { BulkheadFactory } from '../Bulkhead/factory'; | ||
import { BulkheadProvider } from './BulkheadProvider'; | ||
|
||
export class InstanceBulkheadProvider<T = any> implements BulkheadProvider<T> { | ||
|
||
private readonly instancesBulkeads = new WeakMap<any, Bulkhead<T>>(); | ||
|
||
constructor( | ||
private readonly bulkheadFactory: BulkheadFactory<T>, | ||
) { } | ||
|
||
public get(instance: any): Bulkhead<T> { | ||
const hasBulkhead = this.instancesBulkeads.has(instance); | ||
if (!hasBulkhead) { | ||
const bulkheadService = this.bulkheadFactory.create(); | ||
this.instancesBulkeads.set(instance, bulkheadService); | ||
} | ||
|
||
return this.instancesBulkeads.get(instance); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import { Factory } from '../../interfaces/factory'; | ||
import { BulkheadFactory } from '../Bulkhead/factory'; | ||
import { BulkheadProvider } from './BulkheadProvider'; | ||
import { ClassBulkheadProvider } from './ClassBulkheadProvider'; | ||
import { InstanceBulkheadProvider } from './InstanceBulkheadProvider'; | ||
|
||
export class BulkheadProviderFactory<T = any> | ||
implements Factory<BulkheadProvider<T>, ['class' | 'instance']> { | ||
|
||
constructor( | ||
private readonly bulkheadFactory: BulkheadFactory<T>, | ||
) { } | ||
|
||
public create(scope: 'class' | 'instance'): BulkheadProvider<T> { | ||
switch (scope) { | ||
case 'class': | ||
return new ClassBulkheadProvider(this.bulkheadFactory); | ||
|
||
case 'instance': | ||
return new InstanceBulkheadProvider(this.bulkheadFactory); | ||
|
||
default: | ||
throw new Error(`@bulkhead unsuported scope type: ${scope}.`); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import { ExecutionMetaData } from '../types'; | ||
|
||
export class ExecutionQueue<T = any> { | ||
|
||
private readonly queue: ExecutionMetaData<T>[] = []; | ||
|
||
constructor( | ||
private readonly limit: number = Infinity, | ||
) { } | ||
|
||
public store(data: ExecutionMetaData<T>): this { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use standard queue method names, like |
||
this.checkLimit(); | ||
|
||
this.queue.push(data); | ||
|
||
return this; | ||
} | ||
|
||
public next(): ExecutionMetaData<T> { | ||
return this.queue.shift(); | ||
} | ||
|
||
private checkLimit() { | ||
if (this.queue.length >= this.limit) { | ||
throw new Error('@bulkhead execution queue limit reached.'); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import { Factory } from '../../interfaces/factory'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Chances are low we will ever have another execution queue implementation than the current one. Therefore the factory here seems like an added complexity. I'd suggest rather to delete this file, and implement a static method |
||
import { ExecutionQueue } from './ExecutionQueue'; | ||
|
||
export class ExecutionQueueFactory<T = any> implements Factory<ExecutionQueue<T>> { | ||
|
||
constructor( | ||
private readonly limit: number, | ||
) { } | ||
|
||
public create(): ExecutionQueue<T> { | ||
return new ExecutionQueue(this.limit); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import { BulkheadOptions, DEFAULT_OPTIONS } from './BulkheadOptions'; | ||
import { ExecutionQueueFactory } from './ExecutionQueue/factory'; | ||
import { BulkheadFactory } from './Bulkhead/factory'; | ||
import { BulkheadProviderFactory } from './BulkheadProvider/factory'; | ||
import { BulkheadProvider } from './BulkheadProvider/BulkheadProvider'; | ||
import { raiseStrategy } from '../utils'; | ||
|
||
export { BulkheadOptions }; | ||
|
||
/** | ||
* Limits the number of queued concurrent executions of a method. | ||
* When the limit is reached the execution is delayed and queued. | ||
* @param threshold the max number of concurrent executions. | ||
*/ | ||
export function bulkhead(threshold: number, options: BulkheadOptions): MethodDecorator { | ||
const bulkheadProvided = createBulkheadProvider(threshold, options); | ||
const raise = raiseStrategy(options, 'reject'); | ||
|
||
return function (_: any, __: any, descriptor: PropertyDescriptor) { | ||
const method = descriptor.value; | ||
|
||
descriptor.value = async function (...args) { | ||
const bulkhead = bulkheadProvided.get(this); | ||
|
||
try { | ||
return await bulkhead.run(method.bind(this), args); | ||
} catch (error) { | ||
return raise(error); | ||
} | ||
}; | ||
|
||
return descriptor; | ||
}; | ||
} | ||
|
||
function createBulkheadProvider( | ||
threshold: number, | ||
{ size = undefined, scope = 'class' }: BulkheadOptions = DEFAULT_OPTIONS, | ||
): BulkheadProvider { | ||
|
||
const executionQueueFactory = new ExecutionQueueFactory(size); | ||
const bulkheadFactory = new BulkheadFactory(threshold, executionQueueFactory); | ||
return new BulkheadProviderFactory(bulkheadFactory).create(scope); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
export interface ExecutionMetaData<T = any> { | ||
args: any[]; | ||
method: (...args: any[]) => Promise<T>; | ||
resolve: (value: T) => void; | ||
reject: (error?: Error) => void; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export interface Factory<T, Args extends any[]= any[] | never> { | ||
create(...args: Args): T; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
type RaiseStrategies = 'throw' | 'reject' | 'ignore' | 'ignoreAsync'; | ||
|
||
type StrategyOptions = { | ||
onError?: RaiseStrategies; | ||
}; | ||
|
||
export function raiseStrategy(options: StrategyOptions, defaultStrategy: RaiseStrategies) { | ||
const value = options && options.onError || defaultStrategy; | ||
|
||
switch (value) { | ||
case 'reject': | ||
return err => Promise.reject(err); | ||
case 'throw': | ||
return (err) => { throw err; }; | ||
case 'ignore': | ||
return () => { }; | ||
case 'ignoreAsync': | ||
return () => Promise.resolve(); | ||
default: | ||
throw new Error(`Option ${value} is not supported for 'behavior'.`); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bulkhead/Bulkhead.ts
doesn't sound nice. Maybe justbulkhead/index.ts
?