Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(subscribeToObservable): use subscribeWith util #5333

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions spec/util/subscribeWith-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { expect } from 'chai';
import { of, Subscriber, observable } from 'rxjs';
import { subscribeWith } from 'rxjs/internal/util/subscribeWith';
import { asInteropObservable, asInteropSubscriber } from '../helpers/interop-helper';

describe('subscribeWith', () => {
it('should return the subscriber for interop observables', () => {
const observable = asInteropObservable(of(42));
const subscriber = new Subscriber<number>();
const subscription = subscribeWith(observable, subscriber);
expect(subscription).to.equal(subscriber);
});

it('should return the subscriber for interop subscribers', () => {
const observable = of(42);
const subscriber = asInteropSubscriber(new Subscriber<number>());
const subscription = subscribeWith(observable, subscriber);
expect(subscription).to.equal(subscriber);
});
});
8 changes: 1 addition & 7 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,7 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
this._unsubscribeAndRecycle();
const innerSubscriber = new InnerSubscriber(this, undefined, undefined!);
this.add(innerSubscriber);
const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
}
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
}
}
}
8 changes: 1 addition & 7 deletions src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,7 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
}

protected _complete(): void {
Expand Down
8 changes: 1 addition & 7 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,7 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}

protected _complete(): void {
Expand Down
8 changes: 1 addition & 7 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}

protected _complete(): void {
Expand Down
8 changes: 1 addition & 7 deletions src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,7 @@ class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined!);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
subscribeToResult(this, next, undefined, undefined, innerSubscriber);
} else {
this.destination.complete();
}
Expand Down
9 changes: 1 addition & 8 deletions src/internal/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,7 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined!);
this.add(innerSubscriber);
this.innerSubscription = innerSubscriber;
const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
this.innerSubscription = innerSubscription;
}
subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
}

protected _next(value: T) {
Expand Down
6 changes: 0 additions & 6 deletions src/internal/operators/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ class SwitchMapSubscriber<T, R> extends OuterSubscriber<T, R> {
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
if (this.innerSubscription !== innerSubscriber) {
destination.add(this.innerSubscription);
}
}

protected _complete(): void {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/util/subscribeToObservable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Subscriber } from '../Subscriber';
import { observable as Symbol_observable } from '../symbol/observable';
import { subscribeWith } from './subscribeWith';

/**
* Subscribes to an object that implements Symbol.observable with the given
Expand All @@ -12,6 +13,6 @@ export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
} else {
return obs.subscribe(subscriber);
return subscribeWith(obs, subscriber);
}
};
47 changes: 47 additions & 0 deletions src/internal/util/subscribeWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';

/**
* Subscribes a subscriber to an observable and ensures the subscriber is
* returned as the subscription.
*
* When an instance of a subscriber from within this package is passed to the
* subscribe method of an observable from within this package, the returned
* subscription is the subscriber instance.
*
* Operator implementations depend upon this behaviour, so it's important
* that interop subscribers and observables behave in a similar manner. If
* they do not, unsubscription chains can be broken.
*
* This function ensures that if the subscription returned from the subscribe
* call is not the subscriber itself, the subscription is added to the
* subscriber and the subscriber is returned. Doing so will ensure that the
* unsubscription chain is not broken.
*
* This function needs to be used wherever an interop observable or
* subscriber could be encountered. There are two such places:
* - within `subscribeToObservable`; and
* - within the `call` method of each operator's `Operator` class.
*
* Within `subscribeToObservable` the observables are almost always going to
* be interop - as they're obtained via the `Symbol.observable` property.
*
* Within the `call` method, the operator's subscriber will be interop -
* relative to the source observable - if the operator is imported from a
* package that uses a different version of RxJS.
*
* @param observable the observable to subscribe to
* @param subscriber the subscriber to be subscribed
* @returns the passed-in subscriber (as the subscription)
*/
export function subscribeWith<T>(
observable: Observable<T>,
subscriber: Subscriber<T>
): Subscription {
const subscription = observable.subscribe(subscriber);
if (subscription !== subscriber) {
subscriber.add(subscription);
}
return subscriber;
}