Skip to content

Commit

Permalink
fix: chain interop/safe subscriber unsubscriptions correctly (#5472)
Browse files Browse the repository at this point in the history
* test: add failing interop subscriber test

* fix: chain safe subscribers to interop subscribers

Closes #5469 #5311 #2675

* refactor: remove subscribeWith

* refactor: minor change to types to clean up code.

Co-authored-by: Ben Lesh <[email protected]>
  • Loading branch information
cartant and benlesh authored Jun 9, 2020
1 parent ced2fec commit 98ad0eb
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 75 deletions.
21 changes: 20 additions & 1 deletion spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from 'chai';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';
import { Subscriber } from 'rxjs';
import { Subscriber, Observable } from 'rxjs';
import { asInteropSubscriber } from './helpers/interop-helper';

/** @test {Subscriber} */
describe('Subscriber', () => {
Expand Down Expand Up @@ -97,4 +98,22 @@ describe('Subscriber', () => {

expect(argument).to.have.lengthOf(0);
});

it('should chain interop unsubscriptions', () => {
let observableUnsubscribed = false;
let subscriberUnsubscribed = false;
let subscriptionUnsubscribed = false;

const subscriber = new Subscriber<void>();
subscriber.add(() => subscriberUnsubscribed = true);

const source = new Observable<void>(() => () => observableUnsubscribed = true);
const subscription = source.subscribe(asInteropSubscriber(subscriber));
subscription.add(() => subscriptionUnsubscribed = true);
subscriber.unsubscribe();

expect(observableUnsubscribed).to.be.true;
expect(subscriberUnsubscribed).to.be.true;
expect(subscriptionUnsubscribed).to.be.true;
});
});
20 changes: 0 additions & 20 deletions spec/util/subscribeWith-spec.ts

This file was deleted.

6 changes: 3 additions & 3 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isFunction } from './util/isFunction';
import { empty as emptyObserver } from './Observer';
import { Observer, PartialObserver } from './types';
import { Subscription } from './Subscription';
import { Subscription, isSubscription } from './Subscription';
import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
import { config } from './config';
import { hostReportError } from './util/hostReportError';
Expand Down Expand Up @@ -187,8 +187,8 @@ export class SafeSubscriber<T> extends Subscriber<T> {
complete = (<PartialObserver<T>> observerOrNext).complete;
if (observerOrNext !== emptyObserver) {
context = Object.create(observerOrNext);
if (isFunction(context.unsubscribe)) {
this.add(<() => void> context.unsubscribe.bind(context));
if (isSubscription(observerOrNext)) {
observerOrNext.add(this.unsubscribe.bind(this));
}
context.unsubscribe = this.unsubscribe.bind(this);
}
Expand Down
6 changes: 6 additions & 0 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ export class Subscription implements SubscriptionLike {
}
}

export function isSubscription(value: any): value is Subscription {
return value &&
typeof value.add === 'function' &&
typeof value.unsubscribe === 'function';
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
3 changes: 1 addition & 2 deletions src/internal/operators/finalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { subscribeWith } from '../util/subscribeWith';

/**
* Returns an Observable that mirrors the source Observable, but will call a specified function when
Expand Down Expand Up @@ -68,7 +67,7 @@ class FinallyOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
const subscription = subscribeWith(source, subscriber);
const subscription = source.subscribe(subscriber);
subscription.add(this.callback);
return subscription;
}
Expand Down
3 changes: 1 addition & 2 deletions src/internal/util/subscribeToObservable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Subscriber } from '../Subscriber';
import { observable as Symbol_observable } from '../symbol/observable';
import { subscribeWith } from './subscribeWith';

/**
* Subscribes to an object that implements Symbol.observable with the given
Expand All @@ -13,6 +12,6 @@ export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>
// Should be caught by observable subscribe function error handling.
throw new TypeError('Provided object does not correctly implement Symbol.observable');
} else {
return subscribeWith(obs, subscriber);
return obs.subscribe(subscriber);
}
};
47 changes: 0 additions & 47 deletions src/internal/util/subscribeWith.ts

This file was deleted.

0 comments on commit 98ad0eb

Please sign in to comment.