Skip to content

Commit

Permalink
fix: switchMap and exhaustMap behave correctly with re-entrant code.
Browse files Browse the repository at this point in the history
- switchMap should unsubscribe previous inner sub when getting synchronously reentrance during subscribing the inner sub
- exhaustMap should ignore subsequent synchronous reentrances during subscribing the inner sub
  • Loading branch information
naiweizheng authored Aug 31, 2020
1 parent c9ea2b0 commit c289688
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 4 deletions.
17 changes: 16 additions & 1 deletion spec/operators/exhaustMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { concat, defer, Observable, of } from 'rxjs';
import { concat, defer, Observable, of, BehaviorSubject } from 'rxjs';
import { exhaustMap, mergeMap, takeWhile, map, take } from 'rxjs/operators';
import { expect } from 'chai';
import { asInteropObservable } from '../helpers/interop-helper';
Expand Down Expand Up @@ -451,4 +451,19 @@ describe('exhaustMap', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should ignore subsequent synchronous reentrances during subscribing the inner sub', () => {
const e = new BehaviorSubject(1);
const results: Array<number> = [];

e.pipe(
take(3),
exhaustMap(value => new Observable<number>(subscriber => {
e.next(value+1);
subscriber.next(value);
})),
).subscribe(value => results.push(value));

expect(results).to.deep.equal([1]);
});
});
17 changes: 16 additions & 1 deletion spec/operators/switchMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchMap, mergeMap, map, takeWhile, take } from 'rxjs/operators';
import { concat, defer, of, Observable } from 'rxjs';
import { concat, defer, of, Observable, BehaviorSubject } from 'rxjs';
import { asInteropObservable } from '../helpers/interop-helper';

/** @test {switchMap} */
Expand Down Expand Up @@ -460,4 +460,19 @@ describe('switchMap', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should unsubscribe previous inner sub when getting synchronously reentrance during subscribing the inner sub', () => {
const e = new BehaviorSubject(1);
const results: Array<number> = [];

e.pipe(
take(3),
switchMap(value => new Observable<number>(subscriber => {
e.next(value+1);
subscriber.next(value);
})),
).subscribe(value => results.push(value));

expect(results).to.deep.equal([3]);
});
});
3 changes: 2 additions & 1 deletion src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class ExhaustMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
const innerSubscriber = new SimpleInnerSubscriber(this);
const destination = this.destination;
destination.add(innerSubscriber);
this.innerSubscription = innerSubscribe(result, innerSubscriber);
this.innerSubscription = innerSubscriber;
innerSubscribe(result, innerSubscriber);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class SwitchMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
}
const innerSubscriber = new SimpleInnerSubscriber(this);
this.destination.add(innerSubscriber);
this.innerSubscription = innerSubscribe(result, innerSubscriber);
this.innerSubscription = innerSubscriber;
innerSubscribe(result, innerSubscriber);
}

protected _complete(): void {
Expand Down

0 comments on commit c289688

Please sign in to comment.