diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 7994f67c9f..12cb5ea77f 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; -import { concat, Observable, of, throwError, EMPTY, from } from 'rxjs'; -import { catchError, map, mergeMap } from 'rxjs/operators'; +import { concat, defer, Observable, of, throwError, EMPTY, from } from 'rxjs'; +import { catchError, map, mergeMap, takeWhile } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; @@ -121,6 +121,31 @@ describe('catchError operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + throwError(new Error('Some error')).pipe( + catchError(() => synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should catch error and replace it with a hot Observable', () => { const e1 = hot('--a--b--# '); const e1subs = '^ ! '; diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 6f7aeace90..784e299d03 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -3,6 +3,7 @@ import {Subscriber} from '../Subscriber'; import {Observable} from '../Observable'; import {OuterSubscriber} from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; import {ObservableInput, OperatorFunction, MonoTypeOperatorFunction} from '../types'; @@ -121,7 +122,9 @@ class CatchSubscriber extends OuterSubscriber { return; } this._unsubscribeAndRecycle(); - this.add(subscribeToResult(this, result)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } } }