Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chain interop/safe subscriber unsubscriptions correctly #5472

Merged
merged 4 commits into from
Jun 9, 2020

Conversation

cartant
Copy link
Collaborator

@cartant cartant commented Jun 5, 2020

Description:

This PR fixes a problem in the core of SafeSubscriber and, with the fix, the subscribeWith util function that was added in #5333 is no longer required.

The problem is that SafeSubscriber behaves in a manner that is completely different to the known-to-be-safe Subscriber. Specifically, unsubscription is not chained correctly.

In Subscriber, the source subscription (a Subscriber is a Subscription) is added to the destination subscription. This is done so that when the subscriber/subscription associated with an operator has its unsubscribe method called, it unsubscribes from its source subscriptions.

Prior to this change, the implementation of SafeSubscriber had this the wrong way around: it added a call to the observer's unsubscribe method to the SafeSubscriber:

this.add(<() => void> context.unsubscribe.bind(context));

This is not correct. The destination could be a partial observer - with next, error and complete methods, but it could also be an interop Subscriber - i.e. a Subscriber that fails the instanceof test because it's from a different module, etc.

Chaining unsubscriptions from the source to the destination is incorrect. When a source completes and is automatically unsubscribed, it should not effect the unsubscription of its destination. For example, here:

of(42).switchMap(() => interval(1e3)).subscribe(value => console.log(value));

the source emits 42, completes and the subscriber associated with the switchMap operator is automatically unsubscribed from the source:

protected _complete(): void {
const {innerSubscription} = this;
if (!innerSubscription || innerSubscription.closed) {
super._complete();
}
this.unsubscribe();
}

However, that subscriber's destination is not unsubscribed. If unsubscription were to be chained from the source, the callback passed to subscribe would be unsubscribed, too, and the flattening would be ineffectual.

In fact, switchMap's destination is never unsubscribed from within the switchMap operator:

protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}

The unsubscription of a destination isn't effected by the unsubscription of a source. Rx does not work like that: unsubscribe is not a notification.

Unsubscription does not chain like a next, complete or error notification. Unsubscription is chained in the other direction: when unsubscribe is called on a subscription, that subscription unsubscribes from its sources. It does not - and must not - call unsubscribe on its destination.

With this change, #5469 is fixed and subscribeWith is no longer required. This PR includes a failing test for the aforementioned issue and removes subscribeWith. All tests pass - including those that were added along with the subscribeWith util.

This also means that the change in PR #5311 should not be applied - it will not only break subjects it will also break interop subscribers - and its related issue #2675 should be closed. unsubscribe is not a notification and it should not be an expectation that an unsubscribe method can be added to a partial observer to be called when a source subscription is unsubscribed. To get something like an unsubscribe notification, callers need to use the finalize operator.

Related issue (if exists): #5469 #5311 #2675

@cartant cartant requested a review from benlesh June 9, 2020 05:04
@cartant cartant marked this pull request as ready for review June 9, 2020 05:04
Copy link
Member

@trxcllnt trxcllnt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this looks right. IIRC the reason SafeSubscriber did this was a request that subscribing with naked Objects would call an unsubscribe() function, but as you point out a better way to do this is with the finalize() operator:

// some expensive resource
socket = new WebSocket();
sub = source.subscribe({
  unsubscribe() { socket.close(); }
});
// later
sub.unsubscribe();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants