Skip to content

Commit

Permalink
fix(mergeScan): stop listening to a synchronous inner-obervable when …
Browse files Browse the repository at this point in the history
…unsubscribed
  • Loading branch information
peaBerberian committed Aug 19, 2018
1 parent 456ef33 commit c4002f3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
30 changes: 28 additions & 2 deletions spec/operators/mergeScan-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { TestScheduler } from 'rxjs/testing';
import { of, EMPTY, NEVER, concat, throwError } from 'rxjs';
import { mergeScan, delay, mergeMap } from 'rxjs/operators';
import { of, defer, EMPTY, NEVER, concat, throwError } from 'rxjs';
import { mergeScan, delay, mergeMap, takeWhile } from 'rxjs/operators';
import { expect } from 'chai';

declare const rxTestScheduler: TestScheduler;
/** @test {mergeScan} */
Expand Down Expand Up @@ -136,6 +137,31 @@ describe('mergeScan', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

of(null).pipe(
mergeScan(() => synchronousObservable, 0),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should handle errors in the projection function', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any, value: T, index: number): void {
this.add(subscribeToResult<T, R>(this, ish, value, index));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
}

protected _complete(): void {
Expand Down

0 comments on commit c4002f3

Please sign in to comment.