diff --git a/spec/util/subscribeWith-spec.ts b/spec/util/subscribeWith-spec.ts new file mode 100644 index 0000000000..1d25ff2a6a --- /dev/null +++ b/spec/util/subscribeWith-spec.ts @@ -0,0 +1,20 @@ +import { expect } from 'chai'; +import { of, Subscriber, observable } from 'rxjs'; +import { subscribeWith } from 'rxjs/internal/util/subscribeWith'; +import { asInteropObservable, asInteropSubscriber } from '../helpers/interop-helper'; + +describe('subscribeWith', () => { + it('should return the subscriber for interop observables', () => { + const observable = asInteropObservable(of(42)); + const subscriber = new Subscriber(); + const subscription = subscribeWith(observable, subscriber); + expect(subscription).to.equal(subscriber); + }); + + it('should return the subscriber for interop subscribers', () => { + const observable = of(42); + const subscriber = asInteropSubscriber(new Subscriber()); + const subscription = subscribeWith(observable, subscriber); + expect(subscription).to.equal(subscriber); + }); +}); \ No newline at end of file diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 0ea078ab08..68d50fe771 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -153,13 +153,7 @@ class CatchSubscriber extends OuterSubscriber { this._unsubscribeAndRecycle(); const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); this.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - this.add(innerSubscription); - } + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } } } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 77f2a396d1..27f209c61d 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -124,13 +124,7 @@ class ExhaustMapSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index a19f65df8f..4f49d2fc48 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -144,13 +144,7 @@ export class MergeMapSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, ish, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 0572ed7af2..d985275291 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -105,13 +105,7 @@ export class MergeScanSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, ish, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index aab2790274..b2b2027a1a 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -161,13 +161,7 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, next, undefined, undefined, innerSubscriber); } else { this.destination.complete(); } diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 28d1e363dc..12912cffa0 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -73,14 +73,7 @@ class SkipUntilSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); this.add(innerSubscriber); this.innerSubscription = innerSubscriber; - const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - this.add(innerSubscription); - this.innerSubscription = innerSubscription; - } + subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); } protected _next(value: T) { diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index c460787200..72c8f3ddbc 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -136,12 +136,6 @@ class SwitchMapSubscriber extends OuterSubscriber { const destination = this.destination as Subscription; destination.add(innerSubscriber); this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (this.innerSubscription !== innerSubscriber) { - destination.add(this.innerSubscription); - } } protected _complete(): void { diff --git a/src/internal/util/subscribeToObservable.ts b/src/internal/util/subscribeToObservable.ts index 36b7cb283f..5cf469aecc 100644 --- a/src/internal/util/subscribeToObservable.ts +++ b/src/internal/util/subscribeToObservable.ts @@ -1,5 +1,6 @@ import { Subscriber } from '../Subscriber'; import { observable as Symbol_observable } from '../symbol/observable'; +import { subscribeWith } from './subscribeWith'; /** * Subscribes to an object that implements Symbol.observable with the given @@ -12,6 +13,6 @@ export const subscribeToObservable = (obj: any) => (subscriber: Subscriber // Should be caught by observable subscribe function error handling. throw new TypeError('Provided object does not correctly implement Symbol.observable'); } else { - return obs.subscribe(subscriber); + return subscribeWith(obs, subscriber); } }; diff --git a/src/internal/util/subscribeWith.ts b/src/internal/util/subscribeWith.ts new file mode 100644 index 0000000000..a149581cb6 --- /dev/null +++ b/src/internal/util/subscribeWith.ts @@ -0,0 +1,47 @@ +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; + +/** + * Subscribes a subscriber to an observable and ensures the subscriber is + * returned as the subscription. + * + * When an instance of a subscriber from within this package is passed to the + * subscribe method of an observable from within this package, the returned + * subscription is the subscriber instance. + * + * Operator implementations depend upon this behaviour, so it's important + * that interop subscribers and observables behave in a similar manner. If + * they do not, unsubscription chains can be broken. + * + * This function ensures that if the subscription returned from the subscribe + * call is not the subscriber itself, the subscription is added to the + * subscriber and the subscriber is returned. Doing so will ensure that the + * unsubscription chain is not broken. + * + * This function needs to be used wherever an interop observable or + * subscriber could be encountered. There are two such places: + * - within `subscribeToObservable`; and + * - within the `call` method of each operator's `Operator` class. + * + * Within `subscribeToObservable` the observables are almost always going to + * be interop - as they're obtained via the `Symbol.observable` property. + * + * Within the `call` method, the operator's subscriber will be interop - + * relative to the source observable - if the operator is imported from a + * package that uses a different version of RxJS. + * + * @param observable the observable to subscribe to + * @param subscriber the subscriber to be subscribed + * @returns the passed-in subscriber (as the subscription) + */ +export function subscribeWith( + observable: Observable, + subscriber: Subscriber +): Subscription { + const subscription = observable.subscribe(subscriber); + if (subscription !== subscriber) { + subscriber.add(subscription); + } + return subscriber; +} \ No newline at end of file