From c4002f3808a2a67bd2fb53b91ef0881d197e5eba Mon Sep 17 00:00:00 2001 From: peaBerberian Date: Sun, 19 Aug 2018 14:04:32 +0200 Subject: [PATCH] fix(mergeScan): stop listening to a synchronous inner-obervable when unsubscribed --- spec/operators/mergeScan-spec.ts | 30 +++++++++++++++++++++++++++-- src/internal/operators/mergeScan.ts | 4 +++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/spec/operators/mergeScan-spec.ts b/spec/operators/mergeScan-spec.ts index 7d41f39cbe..bdd8f3fee6 100644 --- a/spec/operators/mergeScan-spec.ts +++ b/spec/operators/mergeScan-spec.ts @@ -1,7 +1,8 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { TestScheduler } from 'rxjs/testing'; -import { of, EMPTY, NEVER, concat, throwError } from 'rxjs'; -import { mergeScan, delay, mergeMap } from 'rxjs/operators'; +import { of, defer, EMPTY, NEVER, concat, throwError } from 'rxjs'; +import { mergeScan, delay, mergeMap, takeWhile } from 'rxjs/operators'; +import { expect } from 'chai'; declare const rxTestScheduler: TestScheduler; /** @test {mergeScan} */ @@ -136,6 +137,31 @@ describe('mergeScan', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + mergeScan(() => synchronousObservable, 0), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + it('should handle errors in the projection function', () => { const e1 = hot('--a--^--b--c--d--e--f--g--|'); const e1subs = '^ !'; diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index ba7d2165d6..5c34d03a31 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -100,7 +100,9 @@ export class MergeScanSubscriber extends OuterSubscriber { } private _innerSub(ish: any, value: T, index: number): void { - this.add(subscribeToResult(this, ish, value, index)); + const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + this.add(innerSubscriber); + subscribeToResult(this, ish, value, index, innerSubscriber); } protected _complete(): void {