Skip to content

Commit

Permalink
feat(retry): add config to reset error count on successful emission (#…
Browse files Browse the repository at this point in the history
…5280)

* feat(retry): add config to reset error count on successful emission

This PR adds the ability to reset the error counter on successful emissions using the `retry`
operator. The current behavior for `retry(n)` is to call error if n errors occurred, regardless of
whether or not they were consecutive. Now one would be able to use `retry(n, true)` to have the
count reset so that only n consecutive errors will cause the observable to fail.

* feat(retry): add config parameter

added overloaded signature to the `retry` operator that accepts a config object

* chore: restore package-lock.json

* chore: appease TypeScript

* chore: revert change to Observable spec
  • Loading branch information
cartant authored Feb 3, 2020
1 parent 5170728 commit ab6e9fc
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 7 deletions.
84 changes: 81 additions & 3 deletions spec/operators/retry-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { retry, map, take, mergeMap, concat, multicast, refCount } from 'rxjs/operators';
import { Observable, Observer, of, throwError, Subject } from 'rxjs';
import { Observable, Observer, defer, range, of, throwError, Subject } from 'rxjs';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -58,16 +58,94 @@ describe('retry operator', () => {
retry(retries - 1)
).subscribe(
(x: number) => {
expect(x).to.equal(42);
done("shouldn't next");
},
(err: any) => {
expect(errors).to.equal(2);
done();
}, () => {
expect('this was called').to.be.true;
done("shouldn't complete");
});
});

it('should retry a number of times, then call error handler (with resetOnSuccess)', (done: MochaDone) => {
let errors = 0;
const retries = 2;
Observable.create((observer: Observer<number>) => {
observer.next(42);
observer.complete();
}).pipe(
map((x: any) => {
errors += 1;
throw 'bad';
}),
retry({count: retries - 1, resetOnSuccess: true})
).subscribe(
(x: number) => {
done("shouldn't next");
},
(err: any) => {
expect(errors).to.equal(2);
done();
}, () => {
done("shouldn't complete");
});
});

it('should retry a number of times, then call next handler without error, then retry and complete', (done: MochaDone) => {
let index = 0;
let errors = 0;
const retries = 2;
defer(() => range(0, 4 - index)).pipe(
mergeMap(() => {
index++;
if (index === 1 || index === 3) {
errors++;
return throwError('bad');
} else {
return of(42);
}
}),
retry({count: retries - 1, resetOnSuccess: true})
).subscribe(
(x: number) => {
expect(x).to.equal(42);
},
(err: any) => {
done("shouldn't error");
}, () => {
expect(errors).to.equal(retries);
done();
});
});

it('should retry a number of times, then call next handler without error, then retry and error', (done: MochaDone) => {
let index = 0;
let errors = 0;
const retries = 2;
defer(() => range(0, 4 - index)).pipe(
mergeMap(() => {
index++;
if (index === 1 || index === 3) {
errors++;
return throwError('bad');
} else {
return of(42);
}
}),
retry({count: retries - 1, resetOnSuccess: false})
).subscribe(
(x: number) => {
expect(x).to.equal(42);
},
(err: any) => {
expect(errors).to.equal(retries);
done();
}, () => {
done("shouldn't complete");
});
});

it('should retry until successful completion', (done: MochaDone) => {
let errors = 0;
const retries = 10;
Expand Down
38 changes: 34 additions & 4 deletions src/internal/operators/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import { Observable } from '../Observable';

import { MonoTypeOperatorFunction, TeardownLogic } from '../types';

export interface RetryConfig {
count: number;
resetOnSuccess?: boolean;
}

/**
* Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
* calls `error`, this method will resubscribe to the source Observable for a maximum of `count` resubscriptions (given
Expand Down Expand Up @@ -46,21 +51,33 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
* ```
*
* @param {number} count - Number of retry attempts before failing.
* @param {boolean} resetOnSuccess - When set to `true` every successful emission will reset the error count
* @return {Observable} The source Observable modified with the retry logic.
* @method retry
* @owner Observable
*/
export function retry<T>(count: number = -1): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(new RetryOperator(count, source));
export function retry<T>(count?: number): MonoTypeOperatorFunction<T>;
export function retry<T>(config: RetryConfig): MonoTypeOperatorFunction<T>;
export function retry<T>(configOrCount: number | RetryConfig = -1): MonoTypeOperatorFunction<T> {
let config: RetryConfig;
if (configOrCount && typeof configOrCount === 'object') {
config = configOrCount as RetryConfig;
} else {
config = {
count: configOrCount as number
};
}
return (source: Observable<T>) => source.lift(new RetryOperator(config.count, !!config.resetOnSuccess, source));
}

class RetryOperator<T> implements Operator<T, T> {
constructor(private count: number,
private resetOnSuccess: boolean,
private source: Observable<T>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RetrySubscriber(subscriber, this.count, this.source));
return source.subscribe(new RetrySubscriber(subscriber, this.count, this.resetOnSuccess, this.source));
}
}

Expand All @@ -70,11 +87,24 @@ class RetryOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class RetrySubscriber<T> extends Subscriber<T> {
private readonly initialCount: number;

constructor(destination: Subscriber<any>,
private count: number,
private source: Observable<T>) {
private resetOnSuccess: boolean,
private source: Observable<T>
) {
super(destination);
this.initialCount = this.count;
}

next(value?: T): void {
super.next(value);
if (this.resetOnSuccess) {
this.count = this.initialCount;
}
}

error(err: any) {
if (!this.isStopped) {
const { source, count } = this;
Expand Down

0 comments on commit ab6e9fc

Please sign in to comment.