Skip to content

Commit

Permalink
feat(operator): add retry
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremymwells authored and Jeremy committed Aug 26, 2015
1 parent f03adaf commit 4451db5
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 0 deletions.
69 changes: 69 additions & 0 deletions spec/operators/retry-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.retry()', function () {
it('should retry a number of times, without error, then complete', function (done) {
var errors = 0;
var retries = 2;
Observable.value(42)
.map(function(x){
if ((errors+=1) < retries){
throw 'bad';
}
errors = 0;
return x;
})
.retry(retries)
.subscribe(
function(x){
expect(x).toBe(42);
},
function(err){
expect('this was called').toBe(false);
}, done);
});
it('should retry a number of times, then call error handler', function (done) {
var errors = 0;
var retries = 2;
Observable.value(42)
.map(function(x){
if ((errors+=1) < retries){
throw 'bad';
}
return x;
})
.retry(retries-1)
.subscribe(
function(x){
expect(x).toBe(42);
},
function(err){
expect(errors).toBe(1);
done();
}, function(){
expect('this was called').toBe(false);
});
});
it('should retry until successful completion', function (done) {
var errors = 0;
var retries = 10;
Observable.value(42)
.map(function(x){
if ((errors+=1) < retries){
throw 'bad';
}
errors = 0;
return x;
})
.retry()
.take(retries)
.subscribe(
function(x){
expect(x).toBe(42);
},
function(err){
expect('this was called').toBe(false);
}, done);
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export default class Observable<T> {
multicast: (subjectFactory: () => Subject<T>) => ConnectableObservable<T>;

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
retry: <T>(count: number) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
repeat: <T>(count: number) => Observable<T>;

Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ observableProto.defaultIfEmpty = defaultIfEmpty;
observableProto.materialize = materialize;

import _catch from './operators/catch';
import retry from './operators/retry';
import retryWhen from './operators/retryWhen';
import repeat from './operators/repeat';

observableProto.catch = _catch;
observableProto.retry = retry;
observableProto.retryWhen = retryWhen;
observableProto.repeat = repeat;

Expand Down
38 changes: 38 additions & 0 deletions src/operators/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

export default function retry<T>(count: number = 0): Observable<T> {
return this.lift(new RetryOperator(count, this));
}

export class RetryOperator<T, R> extends Operator<T, R> {
constructor(private count: number, protected original:Observable<T>) {
super();
}

call(observer: Observer<T>): Observer<T> {
return new RetrySubscriber<T>(observer, this.count, this.original);
}
}

export class RetrySubscriber<T> extends Subscriber<T> {
private retries: number = 0;
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
super(destination);
}

_error(err: any) {
const count = this.count;
if (count && count === (this.retries+=1)){
this.destination.error(err);
} else {
this.resubscribe();
}
}

resubscribe() {
this.original.subscribe(this);
}
}

0 comments on commit 4451db5

Please sign in to comment.