Skip to content

Commit

Permalink
fix(subscriber): unsubscribe parents on error/complete
Browse files Browse the repository at this point in the history
  • Loading branch information
mpodlasin committed Jul 28, 2018
1 parent ad00e16 commit ad8131b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class Observable<T> implements Subscribable<T> {
if (operator) {
operator.call(sink, this.source);
} else {
sink.add(
sink._addParentTeardownLogic(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
Expand Down
21 changes: 19 additions & 2 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { isFunction } from './util/isFunction';
import { empty as emptyObserver } from './Observer';
import { Observer, PartialObserver } from './types';
import { Observer, PartialObserver, TeardownLogic } from './types';
import { Subscription } from './Subscription';
import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
import { config } from './config';
Expand Down Expand Up @@ -47,6 +47,8 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
protected isStopped: boolean = false;
protected destination: PartialObserver<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)

private _parentSubscription: Subscription | null = null;

/**
* @param {Observer|function(value: T): void} [destinationOrNext] A partially
* defined Observer or a `next` callback function.
Expand Down Expand Up @@ -76,7 +78,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>;
this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable;
this.destination = trustedSubscriber;
trustedSubscriber.add(this);
trustedSubscriber._addParentTeardownLogic(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
Expand Down Expand Up @@ -114,6 +116,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
this._unsubscribeParentSubscription();
}
}

Expand All @@ -127,6 +130,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
this._unsubscribeParentSubscription();
}
}

Expand All @@ -152,6 +156,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.unsubscribe();
}

/** @deprecated This is an internal implementation detail, do not use. */
_addParentTeardownLogic(parentTeardownLogic: TeardownLogic) {
this._parentSubscription = this.add(parentTeardownLogic);
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeParentSubscription() {
if (this._parentSubscription !== null) {
this._parentSubscription.unsubscribe();
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
Expand All @@ -162,6 +178,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
this._parentSubscription = null;
return this;
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,8 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
if (!this._socket) {
this._connectSocket();
}
let subscription = new Subscription();
subscription.add(this._output.subscribe(subscriber));
subscription.add(() => {
this._output.subscribe(subscriber);
subscriber.add(() => {
const { _socket } = this;
if (this._output.observers.length === 0) {
if (_socket && _socket.readyState === 1) {
Expand All @@ -287,7 +286,7 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
this._resetState();
}
});
return subscription;
return subscriber;
}

unsubscribe() {
Expand Down
5 changes: 3 additions & 2 deletions src/internal/testing/ColdObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ export class ColdObservable<T> extends Observable<T> implements SubscriptionLogg
super(function (this: Observable<T>, subscriber: Subscriber<any>) {
const observable: ColdObservable<T> = this as any;
const index = observable.logSubscribedFrame();
subscriber.add(new Subscription(() => {
const subscription = new Subscription();
subscription.add(new Subscription(() => {
observable.logUnsubscribedFrame(index);
}));
observable.scheduleMessages(subscriber);
return subscriber;
return subscription;
});
this.scheduler = scheduler;
}
Expand Down
6 changes: 4 additions & 2 deletions src/internal/testing/HotObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ export class HotObservable<T> extends Subject<T> implements SubscriptionLoggable
_subscribe(subscriber: Subscriber<any>): Subscription {
const subject: HotObservable<T> = this;
const index = subject.logSubscribedFrame();
subscriber.add(new Subscription(() => {
const subscription = new Subscription();
subscription.add(new Subscription(() => {
subject.logUnsubscribedFrame(index);
}));
return super._subscribe(subscriber);
subscription.add(super._subscribe(subscriber));
return subscription;
}

setup() {
Expand Down

0 comments on commit ad8131b

Please sign in to comment.