diff --git a/src/operator/retry.ts b/src/operator/retry.ts index d262d0ab8c..3de682756c 100644 --- a/src/operator/retry.ts +++ b/src/operator/retry.ts @@ -1,7 +1,5 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { TeardownLogic } from '../Subscription'; +import { retry as higherOrder } from '../operators/retry'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -20,39 +18,5 @@ import { TeardownLogic } from '../Subscription'; * @owner Observable */ export function retry(this: Observable, count: number = -1): Observable { - return this.lift(new RetryOperator(count, this)); -} - -class RetryOperator implements Operator { - constructor(private count: number, - private source: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class RetrySubscriber extends Subscriber { - constructor(destination: Subscriber, - private count: number, - private source: Observable) { - super(destination); - } - error(err: any) { - if (!this.isStopped) { - const { source, count } = this; - if (count === 0) { - return super.error(err); - } else if (count > -1) { - this.count = count - 1; - } - source.subscribe(this._unsubscribeAndRecycle()); - } - } + return higherOrder(count)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 8382a13df6..8d2b9ce450 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -55,6 +55,7 @@ export { race } from './race'; export { reduce } from './reduce'; export { repeat } from './repeat'; export { repeatWhen } from './repeatWhen'; +export { retry } from './retry'; export { refCount } from './refCount'; export { scan } from './scan'; export { subscribeOn } from './subscribeOn'; diff --git a/src/operators/retry.ts b/src/operators/retry.ts new file mode 100644 index 0000000000..b13113ed04 --- /dev/null +++ b/src/operators/retry.ts @@ -0,0 +1,60 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; + +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable + * calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given + * as a number parameter) rather than propagating the `error` call. + * + * + * + * Any and all items emitted by the source Observable will be emitted by the resulting Observable, even those emitted + * during failed subscriptions. For example, if an Observable fails at first but emits [1, 2] then succeeds the second + * time and emits: [1, 2, 3, 4, 5] then the complete stream of emissions and notifications + * would be: [1, 2, 1, 2, 3, 4, 5, `complete`]. + * @param {number} count - Number of retry attempts before failing. + * @return {Observable} The source Observable modified with the retry logic. + * @method retry + * @owner Observable + */ +export function retry(count: number = -1): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new RetryOperator(count, source)); +} + +class RetryOperator implements Operator { + constructor(private count: number, + private source: Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class RetrySubscriber extends Subscriber { + constructor(destination: Subscriber, + private count: number, + private source: Observable) { + super(destination); + } + error(err: any) { + if (!this.isStopped) { + const { source, count } = this; + if (count === 0) { + return super.error(err); + } else if (count > -1) { + this.count = count - 1; + } + source.subscribe(this._unsubscribeAndRecycle()); + } + } +}