Skip to content

Commit

Permalink
fix(catchError): stop listening to a synchronous inner-obervable when…
Browse files Browse the repository at this point in the history
… unsubscribed
  • Loading branch information
peaBerberian committed Aug 19, 2018
1 parent ee1a339 commit 456ef33
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
29 changes: 27 additions & 2 deletions spec/operators/catch-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs';
import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import * as sinon from 'sinon';
import { createObservableInputs } from '../helpers/test-helper';
Expand Down Expand Up @@ -121,6 +121,31 @@ describe('catchError operator', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

throwError(new Error('Some error')).pipe(
catchError(() => synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should catch error and replace it with a hot Observable', () => {
const e1 = hot('--a--b--# ');
const e1subs = '^ ! ';
Expand Down
5 changes: 4 additions & 1 deletion src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';

import {OuterSubscriber} from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';
import {ObservableInput, OperatorFunction, MonoTypeOperatorFunction} from '../types';

Expand Down Expand Up @@ -121,7 +122,9 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
return;
}
this._unsubscribeAndRecycle();
this.add(subscribeToResult(this, result));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
}
}
}

0 comments on commit 456ef33

Please sign in to comment.