Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. #2360

Merged
merged 1 commit into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions spec/operators/observeOn-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => {
.observeOn(Rx.Scheduler.asap)
.subscribe(
x => {
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('N');
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
expect(observeOnSubscriber._subscriptions[1].state.notification.value)
.to.equal(x);
results.push(x);
},
err => done(err),
() => {
// now that the last nexted value is done, there should only be a complete notification scheduled
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
const observeOnSubscriber = subscription._subscriptions[0];
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
// only this completion notification should remain.
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
.to.equal('C');
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
expect(results).to.deep.equal([1, 2, 3]);
Expand Down
42 changes: 41 additions & 1 deletion spec/operators/switch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => {

expect(completed).to.be.true;
});
});

it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => {
let iStream: Rx.Subject<number>;
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (iStream = new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
iStream.complete();
});
// Expect one child of switch(): The oStream
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(1);
sub.unsubscribe();
});

it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => {
const oStreamControl = new Rx.Subject<number>();
const oStream = oStreamControl.map(() => {
return (new Rx.Subject());
});
const switcher = oStream.switch();
const result = [];
let sub = switcher.subscribe((x: number) => result.push(x));

[0, 1, 2, 3, 4].forEach((n) => {
oStreamControl.next(n); // creates inner
});
// Expect two children of switch(): The oStream and the first inner
expect(
(<any>sub)._subscriptions[0]._subscriptions.length
).to.equal(2);
sub.unsubscribe();
});
});
44 changes: 28 additions & 16 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.destination.complete();
this.unsubscribe();
}

protected _unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
this._parent = null;
this._parents = null;
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
return this;
}
}

/**
Expand All @@ -155,7 +167,7 @@ class SafeSubscriber<T> extends Subscriber<T> {

private _context: any;

constructor(private _parent: Subscriber<T>,
constructor(private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
Expand Down Expand Up @@ -185,46 +197,46 @@ class SafeSubscriber<T> extends Subscriber<T> {

next(value?: T): void {
if (!this.isStopped && this._next) {
const { _parent } = this;
if (!_parent.syncErrorThrowable) {
const { _parentSubscriber } = this;
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._next, value);
} else if (this.__tryOrSetError(_parent, this._next, value)) {
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
this.unsubscribe();
}
}
}

error(err?: any): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._error) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._error, err);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._error, err);
this.__tryOrSetError(_parentSubscriber, this._error, err);
this.unsubscribe();
}
} else if (!_parent.syncErrorThrowable) {
} else if (!_parentSubscriber.syncErrorThrowable) {
this.unsubscribe();
throw err;
} else {
_parent.syncErrorValue = err;
_parent.syncErrorThrown = true;
_parentSubscriber.syncErrorValue = err;
_parentSubscriber.syncErrorThrown = true;
this.unsubscribe();
}
}
}

complete(): void {
if (!this.isStopped) {
const { _parent } = this;
const { _parentSubscriber } = this;
if (this._complete) {
if (!_parent.syncErrorThrowable) {
if (!_parentSubscriber.syncErrorThrowable) {
this.__tryOrUnsub(this._complete);
this.unsubscribe();
} else {
this.__tryOrSetError(_parent, this._complete);
this.__tryOrSetError(_parentSubscriber, this._complete);
this.unsubscribe();
}
} else {
Expand Down Expand Up @@ -254,9 +266,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
}

protected _unsubscribe(): void {
const { _parent } = this;
const { _parentSubscriber } = this;
this._context = null;
this._parent = null;
_parent.unsubscribe();
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
94 changes: 56 additions & 38 deletions src/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

private _subscriptions: ISubscription[];
protected _parent: Subscription = null;
protected _parents: Subscription[] = null;
private _subscriptions: ISubscription[] = null;

/**
* @param {function(): void} [unsubscribe] A function describing how to
Expand All @@ -66,11 +68,26 @@ export class Subscription implements ISubscription {
return;
}

this.closed = true;

const { _unsubscribe, _subscriptions } = (<any> this);
let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);

(<any> this)._subscriptions = null;
this.closed = true;
this._parent = null;
this._parents = null;
// null out _subscriptions first so any child subscriptions that attempt
// to remove themselves from this subscription will noop
this._subscriptions = null;

let index = -1;
let len = _parents ? _parents.length : 0;

// if this._parent is null, then so is this._parents, and we
// don't have to remove ourselves from any parent subscriptions.
while (_parent) {
_parent.remove(this);
// if this._parents is null or index >= len,
// then _parent is set to null, and the loop exits
_parent = ++index < len && _parents[index] || null;
}

if (isFunction(_unsubscribe)) {
let trial = tryCatch(_unsubscribe).call(this);
Expand All @@ -85,8 +102,8 @@ export class Subscription implements ISubscription {

if (isArray(_subscriptions)) {

let index = -1;
const len = _subscriptions.length;
index = -1;
len = _subscriptions.length;

while (++index < len) {
const sub = _subscriptions[index];
Expand Down Expand Up @@ -138,27 +155,33 @@ export class Subscription implements ISubscription {
return this;
}

let sub = (<Subscription> teardown);
let subscription = (<Subscription> teardown);

switch (typeof teardown) {
case 'function':
sub = new Subscription(<(() => void) > teardown);
subscription = new Subscription(<(() => void) > teardown);
case 'object':
if (sub.closed || typeof sub.unsubscribe !== 'function') {
return sub;
if (subscription.closed || typeof subscription.unsubscribe !== 'function') {
return subscription;
} else if (this.closed) {
sub.unsubscribe();
return sub;
subscription.unsubscribe();
return subscription;
} else if (typeof subscription._addParent !== 'function' /* quack quack */) {
const tmp = subscription;
subscription = new Subscription();
subscription._subscriptions = [tmp];
}
break;
default:
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
}

const childSub = new ChildSubscription(sub, this);
this._subscriptions = this._subscriptions || [];
this._subscriptions.push(childSub);
return childSub;
const subscriptions = this._subscriptions || (this._subscriptions = []);

subscriptions.push(subscription);
subscription._addParent(this);

return subscription;
}

/**
Expand All @@ -168,37 +191,32 @@ export class Subscription implements ISubscription {
* @return {void}
*/
remove(subscription: Subscription): void {

// HACK: This might be redundant because of the logic in `add()`
if (subscription == null || (
subscription === this) || (
subscription === Subscription.EMPTY)) {
return;
}

const subscriptions = (<any> this)._subscriptions;

const subscriptions = this._subscriptions;
if (subscriptions) {
const subscriptionIndex = subscriptions.indexOf(subscription);
if (subscriptionIndex !== -1) {
subscriptions.splice(subscriptionIndex, 1);
}
}
}
}

export class ChildSubscription extends Subscription {
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
super();
}

_unsubscribe() {
const { _innerSub, _parent } = this;
_parent.remove(this);
_innerSub.unsubscribe();
private _addParent(parent: Subscription) {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
// If we don't have a parent, or the new parent is the same as the
// current parent, then set this._parent to the new parent.
this._parent = parent;
} else if (!_parents) {
// If there's already one parent, but not multiple, allocate an Array to
// store the rest of the parent Subscriptions.
this._parents = [parent];
} else if (_parents.indexOf(parent) === -1) {
// Only add the new parent to the _parents list if it's not already there.
_parents.push(parent);
}
}
}

function flattenUnsubscriptionErrors(errors: any[]) {
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
}
}
4 changes: 1 addition & 3 deletions src/operator/catch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, R> {
super.error(err2);
return;
}
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._unsubscribeAndRecycle();
this.add(subscribeToResult(this, result));
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Operator } from '../Operator';
import { PartialObserver } from '../Observer';
import { Subscriber } from '../Subscriber';
import { Notification } from '../Notification';
import { TeardownLogic, Subscription } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Action } from '../scheduler/Action';

/**
Expand Down Expand Up @@ -36,11 +36,9 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
*/
export class ObserveOnSubscriber<T> extends Subscriber<T> {
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
const { notification, destination, subscription } = arg;
const { notification, destination } = arg;
notification.observe(destination);
if (subscription) {
subscription.unsubscribe();
}
this.unsubscribe();
}

constructor(destination: Subscriber<T>,
Expand All @@ -50,10 +48,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

private scheduleMessage(notification: Notification<any>): void {
const message = new ObserveOnMessage(notification, this.destination);
message.subscription = this.add(
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
);
this.add(this.scheduler.schedule(
ObserveOnSubscriber.dispatch,
this.delay,
new ObserveOnMessage(notification, this.destination)
));
}

protected _next(value: T): void {
Expand All @@ -70,8 +69,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
}

export class ObserveOnMessage {
public subscription: Subscription;

constructor(public notification: Notification<any>,
public destination: PartialObserver<any>) {
}
Expand Down
5 changes: 1 addition & 4 deletions src/operator/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ class RepeatSubscriber<T> extends Subscriber<T> {
} else if (count > -1) {
this.count = count - 1;
}
this.unsubscribe();
this.isStopped = false;
this.closed = false;
source.subscribe(this);
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
Loading