Skip to content

Commit

Permalink
fix(SafeSubscriber): Propagate disposal to terminal Subscriber
Browse files Browse the repository at this point in the history
Ensure the internal SafeSubscriber is disposed when an Observable is subscribed with a Subscriber or Subscriber-like object.

Fixes ReactiveX#2675
  • Loading branch information
trxcllnt committed Feb 13, 2020
1 parent f7c433b commit 68a06c7
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
39 changes: 39 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,45 @@ describe('Observable', () => {
console.log = _log;
});
});

it('should fully propogate disposal when subscribed with a Subscriber', () => {
let observableUnsubscribed = false;
let subscriberUnsubscribed = false;
let subscriptionUnsubscribed = false;

const subscriber = new Subscriber<any>();
// verify unsubscribe is called on the Subscriber
subscriber.add(() => subscriberUnsubscribed = true);
const source = new Observable(_ => () => observableUnsubscribed = true);
const subscription = source.subscribe(subscriber);
// verify unsubscribe is called on children of the returned Subscription
subscription.add(() => subscriptionUnsubscribed = true);

subscription.unsubscribe();

expect(observableUnsubscribed).to.be.true;
expect(subscriberUnsubscribed).to.be.true;
expect(subscriptionUnsubscribed).to.be.true;
});

it('should fully propogate disposal when subscribed with a Subscriber-like', () => {
let observableUnsubscribed = false;
let subscriberUnsubscribed = false;
let subscriptionUnsubscribed = false;

// verify unsubscribe is called on the "Subscriber" (quack-quack)
const subscriber = { unsubscribe: () => subscriberUnsubscribed = true };
const source = new Observable(_ => () => observableUnsubscribed = true);
const subscription = source.subscribe(<any> subscriber as Subscriber<any>);
// verify unsubscribe is called on children of the returned Subscription
subscription.add(() => subscriptionUnsubscribed = true);

subscription.unsubscribe();

expect(observableUnsubscribed).to.be.true;
expect(subscriberUnsubscribed).to.be.true;
expect(subscriptionUnsubscribed).to.be.true;
});
});

describe('pipe', () => {
Expand Down
1 change: 1 addition & 0 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
this.add(<SafeSubscriber<T>> this.destination);
}
break;
}
Expand Down
7 changes: 5 additions & 2 deletions src/internal/util/toSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ export function toSubscriber<T>(
}
}

if (!nextOrObserver && !error && !complete) {
return new Subscriber(emptyObserver);
if (!error && !complete) {
if (!nextOrObserver) {
return new Subscriber(emptyObserver);
}
return new Subscriber(nextOrObserver);
}

return new Subscriber(nextOrObserver, error, complete);
Expand Down

0 comments on commit 68a06c7

Please sign in to comment.