From 6f90597e51e038dabd8397b9f066ab4e3d344a5b Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 3 Aug 2020 01:16:09 -0500 Subject: [PATCH] fix(retry): Ensure teardown happens before resubscription with synchronous observables Related: #5620 - Resolves an issue where all teardowns would not execute until the result observable was complete if the source was synchronous BREAKING CHANGE: Removed an undocumented behavior where passing a negative count argument to `retry` would result in an observable that repeats forever. --- spec/operators/retry-spec.ts | 19 ++++++ src/internal/operators/retry.ts | 100 +++++++++++++++----------------- 2 files changed, 66 insertions(+), 53 deletions(-) diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 48c907aef8..63c4327640 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -117,6 +117,25 @@ describe('retry operator', () => { }); }); + it('should always teardown before starting the next cycle, even when synchronous', () => { + const results: any[] = []; + const source = new Observable(subscriber => { + subscriber.next(1); + subscriber.next(2); + subscriber.error('bad'); + return () => { + results.push('teardown'); + } + }); + const subscription = source.pipe(retry(3)).subscribe({ + next: value => results.push(value), + error: (err) => results.push(err) + }); + + expect(subscription.closed).to.be.true; + expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown']) + }); + it('should retry a number of times, then call next handler without error, then retry and error', (done: MochaDone) => { let index = 0; let errors = 0; diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index 8997bb309f..df85216853 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -1,9 +1,10 @@ -import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { MonoTypeOperatorFunction } from '../types'; import { lift } from '../util/lift'; +import { Subscription } from '../Subscription'; +import { EMPTY } from '../observable/empty'; export interface RetryConfig { count: number; @@ -58,62 +59,55 @@ export interface RetryConfig { */ export function retry(count?: number): MonoTypeOperatorFunction; export function retry(config: RetryConfig): MonoTypeOperatorFunction; -export function retry(configOrCount: number | RetryConfig = -1): MonoTypeOperatorFunction { +export function retry(configOrCount: number | RetryConfig = Infinity): MonoTypeOperatorFunction { let config: RetryConfig; if (configOrCount && typeof configOrCount === 'object') { - config = configOrCount as RetryConfig; + config = configOrCount; } else { config = { - count: configOrCount as number + count: configOrCount }; } - return (source: Observable) => lift(source, new RetryOperator(config.count, !!config.resetOnSuccess, source)); -} - -class RetryOperator implements Operator { - constructor(private count: number, - private resetOnSuccess: boolean, - private source: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RetrySubscriber(subscriber, this.count, this.resetOnSuccess, this.source)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class RetrySubscriber extends Subscriber { - private readonly initialCount: number; - - constructor(destination: Subscriber, - private count: number, - private resetOnSuccess: boolean, - private source: Observable - ) { - super(destination); - this.initialCount = this.count; - } - - next(value?: T): void { - super.next(value); - if (this.resetOnSuccess) { - this.count = this.initialCount; - } - } + const { count, resetOnSuccess = false } = config; - error(err: any) { - if (!this.isStopped) { - const { source, count } = this; - if (count === 0) { - return super.error(err); - } else if (count > -1) { - this.count = count - 1; + return (source: Observable) => count <= 0 ? EMPTY: lift(source, function (this: Subscriber, source: Observable) { + const subscriber = this; + let soFar = 0; + const subscription = new Subscription(); + let innerSub: Subscription | null; + const subscribeNext = () => { + let syncUnsub = false; + innerSub = source.subscribe({ + next: (value) => { + if (resetOnSuccess) { + soFar = 0; + } + subscriber.next(value); + }, + error: (err) => { + if (soFar++ < count) { + if (innerSub) { + subscription.remove(innerSub); + innerSub.unsubscribe(); + subscribeNext(); + } else { + syncUnsub = true; + } + } else { + subscriber.error(err); + } + }, + complete: () => subscriber.complete(), + }); + if (syncUnsub) { + innerSub.unsubscribe(); + innerSub = null; + subscribeNext(); + } else { + subscription.add(innerSub); } - source.subscribe(this._unsubscribeAndRecycle()); - } - } -} + }; + subscribeNext(); + return subscription; + }) +} \ No newline at end of file