From 0f8c1c600e030b3c144b6e824642fdcecef205c5 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 1 Mar 2020 12:24:37 +1000 Subject: [PATCH 1/4] refactor(subscribeToObservable): #5051 fix --- src/internal/operators/catchError.ts | 8 +------- src/internal/operators/exhaustMap.ts | 8 +------- src/internal/operators/mergeMap.ts | 8 +------- src/internal/operators/mergeScan.ts | 8 +------- src/internal/operators/onErrorResumeNext.ts | 8 +------- src/internal/operators/skipUntil.ts | 9 +-------- src/internal/operators/switchMap.ts | 6 ------ src/internal/util/subscribeToObservable.ts | 12 +++++++++++- 8 files changed, 17 insertions(+), 50 deletions(-) diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 0ea078ab08..68d50fe771 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -153,13 +153,7 @@ class CatchSubscriber extends OuterSubscriber { this._unsubscribeAndRecycle(); const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); this.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - this.add(innerSubscription); - } + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } } } diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 77f2a396d1..27f209c61d 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -124,13 +124,7 @@ class ExhaustMapSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, result, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index a19f65df8f..4f49d2fc48 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -144,13 +144,7 @@ export class MergeMapSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, ish, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 0572ed7af2..d985275291 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -105,13 +105,7 @@ export class MergeScanSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, value, index); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, ish, undefined, undefined, innerSubscriber); } protected _complete(): void { diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index aab2790274..b2b2027a1a 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -161,13 +161,7 @@ class OnErrorResumeNextSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); const destination = this.destination as Subscription; destination.add(innerSubscriber); - const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - destination.add(innerSubscription); - } + subscribeToResult(this, next, undefined, undefined, innerSubscriber); } else { this.destination.complete(); } diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 28d1e363dc..12912cffa0 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -73,14 +73,7 @@ class SkipUntilSubscriber extends OuterSubscriber { const innerSubscriber = new InnerSubscriber(this, undefined, undefined!); this.add(innerSubscriber); this.innerSubscription = innerSubscriber; - const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (innerSubscription !== innerSubscriber) { - this.add(innerSubscription); - this.innerSubscription = innerSubscription; - } + subscribeToResult(this, notifier, undefined, undefined, innerSubscriber); } protected _next(value: T) { diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index c460787200..72c8f3ddbc 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -136,12 +136,6 @@ class SwitchMapSubscriber extends OuterSubscriber { const destination = this.destination as Subscription; destination.add(innerSubscriber); this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber); - // The returned subscription will usually be the subscriber that was - // passed. However, interop subscribers will be wrapped and for - // unsubscriptions to chain correctly, the wrapper needs to be added, too. - if (this.innerSubscription !== innerSubscriber) { - destination.add(this.innerSubscription); - } } protected _complete(): void { diff --git a/src/internal/util/subscribeToObservable.ts b/src/internal/util/subscribeToObservable.ts index 36b7cb283f..1134ad0bc7 100644 --- a/src/internal/util/subscribeToObservable.ts +++ b/src/internal/util/subscribeToObservable.ts @@ -12,6 +12,16 @@ export const subscribeToObservable = (obj: any) => (subscriber: Subscriber // Should be caught by observable subscribe function error handling. throw new TypeError('Provided object does not correctly implement Symbol.observable'); } else { - return obs.subscribe(subscriber); + const subscription = obs.subscribe(subscriber); + // With observables internal to this package, the returned subscription + // will be the subscriber that was passed to the subscribe call. Ensure + // that the interop behaviour is the same - that the passed-in subscriber + // is returned from this function - and if the received subscription is not + // the subscriber that was passed in, add it to ensure that unsubscription + // is correctly chained. + if (subscription !== subscriber) { + subscriber.add(subscription); + } + return subscriber; } }; From 95b1fc18d2336492dbb1adb27489eb633409cdd5 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 1 Mar 2020 16:01:16 +1000 Subject: [PATCH 2/4] refactor: add subscribeWith --- src/internal/util/subscribeToObservable.ts | 13 ++------ src/internal/util/subscribeWith.ts | 39 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 11 deletions(-) create mode 100644 src/internal/util/subscribeWith.ts diff --git a/src/internal/util/subscribeToObservable.ts b/src/internal/util/subscribeToObservable.ts index 1134ad0bc7..5cf469aecc 100644 --- a/src/internal/util/subscribeToObservable.ts +++ b/src/internal/util/subscribeToObservable.ts @@ -1,5 +1,6 @@ import { Subscriber } from '../Subscriber'; import { observable as Symbol_observable } from '../symbol/observable'; +import { subscribeWith } from './subscribeWith'; /** * Subscribes to an object that implements Symbol.observable with the given @@ -12,16 +13,6 @@ export const subscribeToObservable = (obj: any) => (subscriber: Subscriber // Should be caught by observable subscribe function error handling. throw new TypeError('Provided object does not correctly implement Symbol.observable'); } else { - const subscription = obs.subscribe(subscriber); - // With observables internal to this package, the returned subscription - // will be the subscriber that was passed to the subscribe call. Ensure - // that the interop behaviour is the same - that the passed-in subscriber - // is returned from this function - and if the received subscription is not - // the subscriber that was passed in, add it to ensure that unsubscription - // is correctly chained. - if (subscription !== subscriber) { - subscriber.add(subscription); - } - return subscriber; + return subscribeWith(obs, subscriber); } }; diff --git a/src/internal/util/subscribeWith.ts b/src/internal/util/subscribeWith.ts new file mode 100644 index 0000000000..debf37880b --- /dev/null +++ b/src/internal/util/subscribeWith.ts @@ -0,0 +1,39 @@ +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; + +export function subscribeWith( + observable: Observable, + subscriber: Subscriber +): Subscription { + // When an instance of a subscriber from within this package is passed to the + // subscribe method of an observable from within this package, the returned + // subscription is the subscriber instance. + // + // Operator implementations depend upon this behaviour, so it's important + // that interop subscribers and observables behave in a similar manner. If + // they do not, unsubscription chains can be broken. + // + // This function ensures that if the subscription returned from the subscribe + // call is not the subscriber itself, the subscription is added to the + // subscriber and the subscriber is returned. Doing so will ensure that the + // unsubscription chain is not broken. + // + // This function needs to be used wherever an interop observable or + // subscriber could be encountered. There are two such places: + // + // - within `subscribeToObservable`; and + // - within the `call` method of each operator's `Operator` class. + // + // Within `subscribeToObservable` the observables are almost always going to + // be interop - as they're obtained via the `Symbol.observable` property. + // + // Within the `call` method, the operator's subscriber will be interop - + // relative to the source observable - if the operator is imported from a + // package that uses a different version of RxJS. + const subscription = observable.subscribe(subscriber); + if (subscription !== subscriber) { + subscriber.add(subscription); + } + return subscriber; +} \ No newline at end of file From d239e66f641fd81a688a0736b20cafbf29f9a5de Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sun, 1 Mar 2020 16:10:24 +1000 Subject: [PATCH 3/4] test: add subscribeWith tests --- spec/util/subscribeWith-spec.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 spec/util/subscribeWith-spec.ts diff --git a/spec/util/subscribeWith-spec.ts b/spec/util/subscribeWith-spec.ts new file mode 100644 index 0000000000..1d25ff2a6a --- /dev/null +++ b/spec/util/subscribeWith-spec.ts @@ -0,0 +1,20 @@ +import { expect } from 'chai'; +import { of, Subscriber, observable } from 'rxjs'; +import { subscribeWith } from 'rxjs/internal/util/subscribeWith'; +import { asInteropObservable, asInteropSubscriber } from '../helpers/interop-helper'; + +describe('subscribeWith', () => { + it('should return the subscriber for interop observables', () => { + const observable = asInteropObservable(of(42)); + const subscriber = new Subscriber(); + const subscription = subscribeWith(observable, subscriber); + expect(subscription).to.equal(subscriber); + }); + + it('should return the subscriber for interop subscribers', () => { + const observable = of(42); + const subscriber = asInteropSubscriber(new Subscriber()); + const subscription = subscribeWith(observable, subscriber); + expect(subscription).to.equal(subscriber); + }); +}); \ No newline at end of file From ba08e584dcefdf4119f6a8f4fa6b9a83cbc8b1d4 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Mon, 2 Mar 2020 12:19:27 +1000 Subject: [PATCH 4/4] chore: move comment into TSDoc --- src/internal/util/subscribeWith.ts | 58 +++++++++++++++++------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/internal/util/subscribeWith.ts b/src/internal/util/subscribeWith.ts index debf37880b..a149581cb6 100644 --- a/src/internal/util/subscribeWith.ts +++ b/src/internal/util/subscribeWith.ts @@ -2,35 +2,43 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; +/** + * Subscribes a subscriber to an observable and ensures the subscriber is + * returned as the subscription. + * + * When an instance of a subscriber from within this package is passed to the + * subscribe method of an observable from within this package, the returned + * subscription is the subscriber instance. + * + * Operator implementations depend upon this behaviour, so it's important + * that interop subscribers and observables behave in a similar manner. If + * they do not, unsubscription chains can be broken. + * + * This function ensures that if the subscription returned from the subscribe + * call is not the subscriber itself, the subscription is added to the + * subscriber and the subscriber is returned. Doing so will ensure that the + * unsubscription chain is not broken. + * + * This function needs to be used wherever an interop observable or + * subscriber could be encountered. There are two such places: + * - within `subscribeToObservable`; and + * - within the `call` method of each operator's `Operator` class. + * + * Within `subscribeToObservable` the observables are almost always going to + * be interop - as they're obtained via the `Symbol.observable` property. + * + * Within the `call` method, the operator's subscriber will be interop - + * relative to the source observable - if the operator is imported from a + * package that uses a different version of RxJS. + * + * @param observable the observable to subscribe to + * @param subscriber the subscriber to be subscribed + * @returns the passed-in subscriber (as the subscription) + */ export function subscribeWith( observable: Observable, subscriber: Subscriber ): Subscription { - // When an instance of a subscriber from within this package is passed to the - // subscribe method of an observable from within this package, the returned - // subscription is the subscriber instance. - // - // Operator implementations depend upon this behaviour, so it's important - // that interop subscribers and observables behave in a similar manner. If - // they do not, unsubscription chains can be broken. - // - // This function ensures that if the subscription returned from the subscribe - // call is not the subscriber itself, the subscription is added to the - // subscriber and the subscriber is returned. Doing so will ensure that the - // unsubscription chain is not broken. - // - // This function needs to be used wherever an interop observable or - // subscriber could be encountered. There are two such places: - // - // - within `subscribeToObservable`; and - // - within the `call` method of each operator's `Operator` class. - // - // Within `subscribeToObservable` the observables are almost always going to - // be interop - as they're obtained via the `Symbol.observable` property. - // - // Within the `call` method, the operator's subscriber will be interop - - // relative to the source observable - if the operator is imported from a - // package that uses a different version of RxJS. const subscription = observable.subscribe(subscriber); if (subscription !== subscriber) { subscriber.add(subscription);