Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Subscriber): don't leak destination #6116

Merged
merged 4 commits into from
Mar 15, 2021

Conversation

cartant
Copy link
Collaborator

@cartant cartant commented Mar 12, 2021

Description:

This PR adds a failing test that shows that subscriptions - i.e. subscribers - returned from subscribe leak their destinations - i.e. their observers or callbacks.

The PR changes Subscriber#unsubscribe to null the destination after super.unsubscribe is called.

This change broke three operators - bufferWhen was one of them - because if unsubscription was effected within onNext and if an error was thrown from within onNext (after the unsubscription) it'd be caught in the OperatorSubscriber and an attempt would be made to call this.destination.error, but this.destination would be null.

Rather than capture this.destination before the try/catch around onNext, onComplete, and onError, I just used the destination that was passed in to the OperatorSubscriber constructor - AFAICT, we can be sure that it's a Subscriber, because OperatorSubscriber is an internal, impementation detail.

Related issue (if exists): Nope, but there is a discussion: #6115

@cartant cartant requested a review from benlesh March 12, 2021 11:53
Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay... I'd just want to see if this works okay when you chain together a few operators. We altered OperatorSubscriber, but there's no new tests for operators related to that, so it has me a little concerned.

@cartant
Copy link
Collaborator Author

cartant commented Mar 12, 2021

but there's no new tests for operators related to that

I'm not sure what additional tests you might be looking for. There are no explicit tests for OperatorSubscriber, AFAICT; it's tested via the operator tests themselves. And regarding operators being chained, pretty much every operator has a don't-break-subscription-chains test with something like this in it - i.e. several operators chained together:

const source = e1.pipe(
map((x) => x),
mergeMap(() => x),
map((x) => x)
);

Regarding the OperatorSubscriber change, these are the tests that fail without it. And they all fail because the subscribers to the selector-returned observables unsubscribe themselves and if a error is thrown from within the selector and is caught in OperatorSubscriber, this.destination will be null, etc.:

it('should propagate error thrown from closingSelector', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const subs = ' ^--------------! ';
const closings = [
cold(' ---------------s--| '),
cold(' ----------(s|) '),
cold(' -------------(s|)')
];
const closeSubs0 = ' ^--------------! ';
const expected = ' ---------------(x#) ';
const values = { x: ['b', 'c', 'd'] };
let i = 0;
const result = e1.pipe(
bufferWhen(() => {
if (i === 1) {
throw 'error';
}
return closings[i++];
})
);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectSubscriptions(closings[0].subscriptions).toBe(closeSubs0);
});
});

it('should work with selector throws', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---1-^-2---4----| ');
const asubs = ' ^-------! ';
const b = hot('---1-^--3----5----|');
const bsubs = ' ^-------! ';
const expected = ' ---x----# ';
const selector = function (x: string, y: string) {
if (y === '5') {
throw new Error('too bad');
} else {
return x + y;
}
};
const observable = of(a, b).pipe(zipAll(selector));
expectObservable(observable).toBe(expected, { x: '23' }, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(asubs);
expectSubscriptions(b.subscriptions).toBe(bsubs);
});
});

it('should work with non-empty observable and non-empty iterable selector that throws', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^--1--2--3--|');
const asubs = ' ^-----!';
const b = [4, 5, 6];
const expected = '---x--#';
const selector = function (x: string, y: number) {
if (y === 5) {
throw new Error('too bad');
} else {
return x + y;
}
};
expectObservable(of(a, b).pipe(zipAll(selector))).toBe(expected, { x: '14' }, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(asubs);
});
});
});

So I'm not sure what else needs to be tested.

@benlesh
Copy link
Member

benlesh commented Mar 15, 2021

That makes sense, thanks @cartant

@benlesh benlesh merged commit 5bba36c into ReactiveX:master Mar 15, 2021
@cartant cartant deleted the cartant/subscriber-null-destination branch March 19, 2021 07:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants