diff --git a/spec/helpers/interop-helper-spec.ts b/spec/helpers/interop-helper-spec.ts new file mode 100644 index 0000000000..f10b5b28fb --- /dev/null +++ b/spec/helpers/interop-helper-spec.ts @@ -0,0 +1,19 @@ +import { expect } from 'chai'; +import { Observable, of, Subscriber } from 'rxjs'; +import { observable as symbolObservable } from 'rxjs/internal/symbol/observable'; +import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber'; +import { asInteropObservable, asInteropSubscriber } from './interop-helper'; + +describe('interop helper', () => { + it('should simulate interop observables', () => { + const observable = asInteropObservable(of(42)); + expect(observable).to.not.be.instanceOf(Observable); + expect(observable[symbolObservable]).to.be.a('function'); + }); + + it('should simulate interop subscribers', () => { + const subscriber = asInteropSubscriber(new Subscriber()); + expect(subscriber).to.not.be.instanceOf(Subscriber); + expect(subscriber[symbolSubscriber]).to.be.undefined; + }); +}); \ No newline at end of file diff --git a/spec/helpers/interop-helper.ts b/spec/helpers/interop-helper.ts new file mode 100644 index 0000000000..05c98abee1 --- /dev/null +++ b/spec/helpers/interop-helper.ts @@ -0,0 +1,57 @@ +import { Observable, Subscriber, Subscription } from 'rxjs'; +import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber'; + +/** + * Returns an observable that will be deemed by this package's implementation + * to be an observable that requires interop. The returned observable will fail + * the `instanceof Observable` test and will deem any `Subscriber` passed to + * its `subscribe` method to be untrusted. + */ +export function asInteropObservable(observable: Observable): Observable { + return new Proxy(observable, { + get(target: Observable, key: string | number | symbol) { + if (key === 'subscribe') { + const { subscribe } = target; + return interopSubscribe(subscribe); + } + return Reflect.get(target, key); + }, + getPrototypeOf(target: Observable) { + const { subscribe, ...rest } = Object.getPrototypeOf(target); + return { + ...rest, + subscribe: interopSubscribe(subscribe) + }; + } + }); +} + +/** + * Returns a subscriber that will be deemed by this package's implementation to + * be untrusted. The returned subscriber will fail the `instanceof Subscriber` + * test and will not include the symbol that identifies trusted subscribers. + */ +export function asInteropSubscriber(subscriber: Subscriber): Subscriber { + return new Proxy(subscriber, { + get(target: Subscriber, key: string | number | symbol) { + if (key === symbolSubscriber) { + return undefined; + } + return Reflect.get(target, key); + }, + getPrototypeOf(target: Subscriber) { + const { [symbolSubscriber]: symbol, ...rest } = Object.getPrototypeOf(target); + return rest; + } + }); +} + +function interopSubscribe(subscribe: (...args: any[]) => Subscription) { + return function (this: Observable, ...args: any[]): Subscription { + const [arg] = args; + if (arg instanceof Subscriber) { + return subscribe.call(this, asInteropSubscriber(arg)); + } + return subscribe.apply(this, args); + }; +} \ No newline at end of file diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 12cb5ea77f..e1f53c917c 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -5,6 +5,7 @@ import { TestScheduler } from 'rxjs/testing'; import * as sinon from 'sinon'; import { createObservableInputs } from '../helpers/test-helper'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { asInteropObservable } from '../helpers/interop-helper'; declare function asDiagram(arg: string): Function; @@ -121,6 +122,27 @@ describe('catchError operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + it('should unsubscribe from a caught cold caught interop observable when unsubscribed explicitly', () => { + const e1 = hot('-1-2-3-# '); + const e1subs = '^ ! '; + const e2 = cold( '5-6-7-8-9-|'); + const e2subs = ' ^ ! '; + const expected = '-1-2-3-5-6-7- '; + const unsub = ' ! '; + + // This test is the same as the previous test, but the observable is + // manipulated to make it look like an interop observable - an observable + // from a foreign library. Interop subscribers are treated differently: + // they are wrapped in a safe subscriber. This test ensures that + // unsubscriptions are chained all the way to the interop subscriber. + + const result = e1.pipe(catchError(() => asInteropObservable(e2))); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = concat( diff --git a/spec/operators/exhaustMap-spec.ts b/spec/operators/exhaustMap-spec.ts index 2fbcfa69da..86b835b327 100644 --- a/spec/operators/exhaustMap-spec.ts +++ b/spec/operators/exhaustMap-spec.ts @@ -2,6 +2,7 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/mar import { concat, defer, Observable, of, from } from 'rxjs'; import { exhaustMap, mergeMap, takeWhile, map } from 'rxjs/operators'; import { expect } from 'chai'; +import { asInteropObservable } from '../helpers/interop-helper'; declare function asDiagram(arg: string): Function; @@ -202,6 +203,39 @@ describe('exhaustMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => { + const x = cold( '--a--b--c--| '); + const xsubs = ' ^ ! '; + const y = cold( '--d--e--f--| '); + const ysubs: string[] = []; + const z = cold( '--g--h--i--| '); + const zsubs = ' ^ ! '; + const e1 = hot('---x---------y-----------------z-------------|'); + const e1subs = '^ ! '; + const expected = '-----a--b--c---------------------g- '; + const unsub = ' ! '; + + const observableLookup = { x: x, y: y, z: z }; + + // This test is the same as the previous test, but the observable is + // manipulated to make it look like an interop observable - an observable + // from a foreign library. Interop subscribers are treated differently: + // they are wrapped in a safe subscriber. This test ensures that + // unsubscriptions are chained all the way to the interop subscriber. + + const result = e1.pipe( + mergeMap(x => of(x)), + exhaustMap(value => asInteropObservable(observableLookup[value])), + mergeMap(x => of(x)) + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(z.subscriptions).toBe(zsubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = concat( diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index d95fe8010b..efe576c528 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { mergeMap, map } from 'rxjs/operators'; import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { asInteropObservable } from '../helpers/interop-helper'; declare const type: Function; declare const asDiagram: Function; @@ -260,6 +261,36 @@ describe('mergeMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c--d- '; + const unsub = ' ! '; + + const observableLookup = { x: x, y: y }; + + // This test manipulates the observable to make it look like an interop + // observable - an observable from a foreign library. Interop subscribers + // are treated differently: they are wrapped in a safe subscriber. This + // test ensures that unsubscriptions are chained all the way to the + // interop subscriber. + + const result = e1.pipe( + mergeMap(x => of(x)), + mergeMap(value => asInteropObservable(observableLookup[value])), + mergeMap(x => of(x)), + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + it('should mergeMap many outer to many inner, inner never completes', () => { const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; const e1 = hot('-a-------b-------c-------d-------| '); diff --git a/spec/operators/onErrorResumeNext-spec.ts b/spec/operators/onErrorResumeNext-spec.ts index b7becf9b45..e7dcd5c9a8 100644 --- a/spec/operators/onErrorResumeNext-spec.ts +++ b/spec/operators/onErrorResumeNext-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { onErrorResumeNext, takeWhile } from 'rxjs/operators'; import { concat, defer, throwError, of } from 'rxjs'; +import { asInteropObservable } from '../helpers/interop-helper'; declare function asDiagram(arg: string): Function; @@ -129,6 +130,23 @@ describe('onErrorResumeNext operator', () => { expect(sideEffects).to.deep.equal([1, 2]); }); + it('should unsubscribe from an interop observble upon explicit unsubscription', () => { + const source = hot('--a--b--#'); + const next = cold( '--c--d--'); + const nextSubs = ' ^ !'; + const subs = '^ !'; + const expected = '--a--b----c--'; + + // This test manipulates the observable to make it look like an interop + // observable - an observable from a foreign library. Interop subscribers + // are treated differently: they are wrapped in a safe subscriber. This + // test ensures that unsubscriptions are chained all the way to the + // interop subscriber. + + expectObservable(source.pipe(onErrorResumeNext(asInteropObservable(next))), subs).toBe(expected); + expectSubscriptions(next.subscriptions).toBe(nextSubs); + }); + it('should work with promise', (done: MochaDone) => { const expected = [1, 2]; const source = concat(of(1), throwError('meh')); diff --git a/spec/operators/skipUntil-spec.ts b/spec/operators/skipUntil-spec.ts index 12430e8879..d730c2a018 100644 --- a/spec/operators/skipUntil-spec.ts +++ b/spec/operators/skipUntil-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { concat, defer, Observable, of, Subject } from 'rxjs'; import { skipUntil, mergeMap } from 'rxjs/operators'; +import { asInteropObservable } from '../helpers/interop-helper'; declare function asDiagram(arg: string): Function; @@ -97,6 +98,31 @@ describe('skipUntil', () => { expectSubscriptions(skip.subscriptions).toBe(skipSubs); }); + it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => { + const e1 = hot('--a--b--c--d--e----|'); + const e1subs = '^ ! '; + const skip = hot('-------------x--| '); + const skipSubs = '^ ! '; + const expected = ('---------- '); + const unsub = ' ! '; + + // This test is the same as the previous test, but the observable is + // manipulated to make it look like an interop observable - an observable + // from a foreign library. Interop subscribers are treated differently: + // they are wrapped in a safe subscriber. This test ensures that + // unsubscriptions are chained all the way to the interop subscriber. + + const result = e1.pipe( + mergeMap(x => of(x)), + skipUntil(asInteropObservable(skip)), + mergeMap(x => of(x)), + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(skip.subscriptions).toBe(skipSubs); + }); + it('should skip all elements when notifier is empty', () => { const e1 = hot('--a--b--c--d--e--|'); const e1subs = '^ !'; @@ -248,7 +274,6 @@ describe('skipUntil', () => { }); it('should stop listening to a synchronous notifier after its first nexted value', () => { - // const source = hot('-^-o---o---o---o---o---o---|'); const sideEffects: number[] = []; const synchronousNotifer = concat( defer(() => { diff --git a/spec/operators/switchMap-spec.ts b/spec/operators/switchMap-spec.ts index 779b4ebf55..790fdc9dce 100644 --- a/spec/operators/switchMap-spec.ts +++ b/spec/operators/switchMap-spec.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators'; import { concat, defer, of, Observable } from 'rxjs'; +import { asInteropObservable } from '../helpers/interop-helper'; declare function asDiagram(arg: string): Function; @@ -169,6 +170,36 @@ describe('switchMap', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c---- '; + const unsub = ' ! '; + + const observableLookup = { x: x, y: y }; + + // This test is the same as the previous test, but the observable is + // manipulated to make it look like an interop observable - an observable + // from a foreign library. Interop subscribers are treated differently: + // they are wrapped in a safe subscriber. This test ensures that + // unsubscriptions are chained all the way to the interop subscriber. + + const result = e1.pipe( + mergeMap(x => of(x)), + switchMap(value => asInteropObservable(observableLookup[value])), + mergeMap(x => of(x)), + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = concat( diff --git a/spec/tsconfig.json b/spec/tsconfig.json index 8b4ff64231..9eb430ca82 100644 --- a/spec/tsconfig.json +++ b/spec/tsconfig.json @@ -1,3 +1,6 @@ { - "extends": "../tsconfig.json" + "extends": "../tsconfig.json", + "compilerOptions": { + "lib": ["esnext", "dom"] + } } \ No newline at end of file diff --git a/spec/util/toSubscriber-spec.ts b/spec/util/toSubscriber-spec.ts index 827ef17272..0845a175a9 100644 --- a/spec/util/toSubscriber-spec.ts +++ b/spec/util/toSubscriber-spec.ts @@ -12,7 +12,7 @@ describe('toSubscriber', () => { expect(sub2.closed).to.be.true; }); -it('should not be closed when other subscriber created with same observer instance completes', () => { + it('should not be closed when other subscriber created with same observer instance completes', () => { let observer = { next: function () { /*noop*/ } }; diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 73037a4cb5..5aeb76fdad 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -137,7 +137,13 @@ class CatchSubscriber extends OuterSubscriber { this._unsubscribeAndRecycle(); const innerSubscriber = new InnerSubscriber(this, undefined, undefined); this.add(innerSubscriber); - subscribeToResult(this, result, undefined, undefined, innerSubscriber); + const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + this.add(innerSubscription); + } } } } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index acfeb5c089..3bf6ff4a41 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -122,10 +122,16 @@ class ExhaustMapSubscriber extends OuterSubscriber { } private _innerSub(result: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - subscribeToResult(this, result, value, index, innerSubscriber); + const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + destination.add(innerSubscription); + } } protected _complete(): void { diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index cf69688740..b8360ce7a9 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -142,10 +142,16 @@ export class MergeMapSubscriber extends OuterSubscriber { } private _innerSub(ish: ObservableInput, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - subscribeToResult(this, ish, value, index, innerSubscriber); + const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + destination.add(innerSubscription); + } } protected _complete(): void { diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 7d12345e84..60ff2882f9 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -103,10 +103,16 @@ export class MergeScanSubscriber extends OuterSubscriber { } private _innerSub(ish: any, value: T, index: number): void { - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - subscribeToResult(this, ish, value, index, innerSubscriber); + const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + destination.add(innerSubscription); + } } protected _complete(): void { diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index 2403770e6d..058ca8ec0c 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -162,7 +162,13 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); const destination = this.destination as Subscription; destination.add(innerSubscriber); - subscribeToResult(this, next, undefined, undefined, innerSubscriber); + const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + destination.add(innerSubscription); + } } else { this.destination.complete(); } diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 3920f6f451..8310adca58 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -74,7 +74,14 @@ class SkipUntilSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined); this.add(innerSubscriber); this.innerSubscription = innerSubscriber; - subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); + const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (innerSubscription !== innerSubscriber) { + this.add(innerSubscription); + this.innerSubscription = innerSubscription; + } } protected _next(value: T) { diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index 4c18328f99..c3c831ad88 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -133,10 +133,16 @@ class SwitchMapSubscriber extends OuterSubscriber { if (innerSubscription) { innerSubscription.unsubscribe(); } - const innerSubscriber = new InnerSubscriber(this, undefined, undefined); + const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber); + this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); + // The returned subscription will usually be the subscriber that was + // passed. However, interop subscribers will be wrapped and for + // unsubscriptions to chain correctly, the wrapper needs to be added, too. + if (this.innerSubscription !== innerSubscriber) { + destination.add(this.innerSubscription); + } } protected _complete(): void { diff --git a/src/internal/util/subscribeToResult.ts b/src/internal/util/subscribeToResult.ts index f350b85733..31368ce6c2 100644 --- a/src/internal/util/subscribeToResult.ts +++ b/src/internal/util/subscribeToResult.ts @@ -5,25 +5,33 @@ import { Subscriber } from '../Subscriber'; import { subscribeTo } from './subscribeTo'; import { Observable } from '../Observable'; +export function subscribeToResult( + outerSubscriber: OuterSubscriber, + result: any, + outerValue: undefined, + outerIndex: undefined, + innerSubscriber: InnerSubscriber +): Subscription | undefined; + export function subscribeToResult( outerSubscriber: OuterSubscriber, result: any, outerValue?: T, - outerIndex?: number, - destination?: Subscriber -): Subscription; + outerIndex?: number +): Subscription | undefined; + export function subscribeToResult( outerSubscriber: OuterSubscriber, result: any, outerValue?: T, outerIndex?: number, - destination: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) -): Subscription | void { - if (destination.closed) { + innerSubscriber: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) +): Subscription | undefined { + if (innerSubscriber.closed) { return undefined; } if (result instanceof Observable) { - return result.subscribe(destination); + return result.subscribe(innerSubscriber); } - return subscribeTo(result)(destination); + return subscribeTo(result)(innerSubscriber) as Subscription; }