Skip to content

Commit

Permalink
fix(retry): fix internal unsubscriptions for retry
Browse files Browse the repository at this point in the history
Fix retry operator to unsubscribe from the source whenever the
source emits an error and a new retry will be attempted. Also fix the
operator to unsubscribe the internal retried subscription to the source
whenever the result Observable is unsubscribed.

Resolves issue #546.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 16, 2015
1 parent ce43722 commit cc92f45
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions src/operators/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,88 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subscription from '../Subscription';

export default function retry<T>(count: number = 0): Observable<T> {
return this.lift(new RetryOperator(count, this));
}

class RetryOperator<T, R> implements Operator<T, R> {
constructor(private count: number, protected original: Observable<T>) {
constructor(private count: number,
protected source: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RetrySubscriber<T>(subscriber, this.count, this.original);
return new FirstRetrySubscriber<T>(subscriber, this.count, this.source);
}
}

class RetrySubscriber<T> extends Subscriber<T> {
private retries: number = 0;
constructor(destination: Subscriber<T>, private count: number, private original: Observable<T>) {
super(destination);
class FirstRetrySubscriber<T> extends Subscriber<T> {
private lastSubscription: Subscription<T>;

constructor(public destination: Subscriber<T>,
private count: number,
private source: Observable<T>) {
super(null);
this.lastSubscription = this;
}

_next(value: T) {
this.destination.next(value);
}

error(error?) {
if (!this.isUnsubscribed) {
super.unsubscribe();
this.resubscribe();
}
}

_complete() {
super.unsubscribe();
this.destination.complete();
}

unsubscribe() {
const lastSubscription = this.lastSubscription;
if (lastSubscription === this) {
super.unsubscribe();
} else {
lastSubscription.unsubscribe();
}
}

resubscribe(retried: number = 0) {
this.lastSubscription.unsubscribe();
const nextSubscriber = new RetryMoreSubscriber(this, this.count, retried + 1);
this.lastSubscription = this.source.subscribe(nextSubscriber);
}
}

class RetryMoreSubscriber<T> extends Subscriber<T> {
constructor(private parent: FirstRetrySubscriber<T>,
private count: number,
private retried: number = 0) {
super(null);
}

_next(value: T) {
this.parent.destination.next(value);
}

_error(err: any) {
const parent = this.parent;
const retried = this.retried;
const count = this.count;
if (count && count === (this.retries += 1)) {
this.destination.error(err);

if (count && retried === count) {
parent.destination.error(err);
} else {
this.resubscribe();
parent.resubscribe(retried);
}
}

resubscribe() {
this.original.subscribe(this);
_complete() {
this.parent.destination.complete();
}
}
}

0 comments on commit cc92f45

Please sign in to comment.