Skip to content

Commit

Permalink
fix(Subscriber): adds unsubscription when errors are thrown from user…
Browse files Browse the repository at this point in the history
…-land handlers.

- slightly improves perf by improving shape of SafeSubscriber so JIT can optimize call patterns.

related #1186
  • Loading branch information
trxcllnt authored and benlesh committed Jan 26, 2016
1 parent 04c42b3 commit dc67d21
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 47 deletions.
122 changes: 112 additions & 10 deletions spec/Observable-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* globals describe, it, expect */
var Rx = require('../dist/cjs/Rx');
var Promise = require('promise');
var Subscriber = Rx.Subscriber;
var Observable = Rx.Observable;

function expectFullObserver(val) {
Expand Down Expand Up @@ -118,6 +119,15 @@ describe('Observable', function () {
expect(mutatedByComplete).toBe(true);
});

it('should work when subscribe is called with no arguments', function () {
var source = new Observable(function (subscriber) {
subscriber.next('foo');
subscriber.complete();
});

source.subscribe();
});

it('should return a Subscription that calls the unsubscribe function returned by the subscriber', function () {
var unsubscribeCalled = false;

Expand All @@ -136,36 +146,128 @@ describe('Observable', function () {
expect(unsubscribeCalled).toBe(true);
});

it('should not run unsubscription logic when an error is thrown sending messages synchronously', function () {
var messageError = false;
var messageErrorValue = false;
var unsubscribeCalled = false;

var sub;
var source = new Observable(function (observer) {
observer.next('boo!');
return function () {
unsubscribeCalled = true;
};
});

try {
sub = source.subscribe(function (x) { throw x; });
} catch (e) {
messageError = true;
messageErrorValue = e;
}

expect(sub).toBe(undefined);
expect(unsubscribeCalled).toBe(false);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});

it('should dispose of the subscriber when an error is thrown sending messages synchronously', function () {
var messageError = false;
var messageErrorValue = false;
var unsubscribeCalled = false;

var sub;
var subscriber = new Subscriber(function (x) { throw x; });
var source = new Observable(function (observer) {
observer.next('boo!');
return function () {
unsubscribeCalled = true;
};
});

try {
sub = source.subscribe(subscriber);
} catch (e) {
messageError = true;
messageErrorValue = e;
}

expect(sub).toBe(undefined);
expect(subscriber.isUnsubscribed).toBe(true);
expect(unsubscribeCalled).toBe(false);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});

describe('when called with an anonymous observer', function () {
it('should accept an anonymous observer with just a next function', function () {
Observable.of(1).subscribe({
it('should accept an anonymous observer with just a next function and call the next function in the context of the anonymous observer', function () {
var o = {
next: function next(x) {
expect(this).toBe(o);
expect(x).toBe(1);
}
});
};
Observable.of(1).subscribe(o);
});

it('should accept an anonymous observer with just an error function', function () {
Observable.throw('bad').subscribe({
it('should accept an anonymous observer with just an error function and call the error function in the context of the anonymous observer', function () {
var o = {
error: function error(err) {
expect(this).toBe(o);
expect(err).toBe('bad');
}
});
};
Observable.throw('bad').subscribe(o);
});

it('should accept an anonymous observer with just a complete function', function (done) {
Observable.empty().subscribe({
it('should accept an anonymous observer with just a complete function and call the complete function in the context of the anonymous observer', function (done) {
var o = {
complete: function complete() {
expect(this).toBe(o);
done();
}
});
};
Observable.empty().subscribe(o);
});

it('should accept an anonymous observer with no functions at all', function () {
expect(function testEmptyObject() {
Observable.empty().subscribe({});
}).not.toThrow();
});

it('should not run unsubscription logic when an error is thrown sending messages synchronously to an anonymous observer', function () {
var messageError = false;
var messageErrorValue = false;
var unsubscribeCalled = false;

var o = {
next: function next(x) {
expect(this).toBe(o);
throw x;
}
};
var sub;
var source = new Observable(function (observer) {
observer.next('boo!');
return function () {
unsubscribeCalled = true;
};
});

try {
sub = source.subscribe(o);
} catch (e) {
messageError = true;
messageErrorValue = e;
}

expect(sub).toBe(undefined);
expect(unsubscribeCalled).toBe(false);
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});
});
});
});
Expand All @@ -188,4 +290,4 @@ describe('Observable.create', function () {
result.subscribe(function () { });
expect(called).toBe(true);
});
});
});
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class Observable<T> implements CoreOperators<T> {
const subscriber = toSubscriber(observerOrNext, error, complete);

if (operator) {
subscriber.add(this._subscribe(this.operator.call(subscriber)));
subscriber.add(this._subscribe(operator.call(subscriber)));
} else {
subscriber.add(this._subscribe(subscriber));
}
Expand Down
89 changes: 67 additions & 22 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {noop} from './util/noop';
import {throwError} from './util/throwError';
import {tryOrThrowError} from './util/tryOrThrowError';
import {isFunction} from './util/isFunction';
import {tryCatch} from './util/tryCatch';
import {errorObject} from './util/errorObject';

import {Observer} from './Observer';
import {Subscription} from './Subscription';
Expand All @@ -12,26 +12,38 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
static create<T>(next?: (x?: T) => void,
error?: (e?: any) => void,
complete?: () => void): Subscriber<T> {
return new SafeSubscriber<T>(next, error, complete);
return new Subscriber(next, error, complete);
}

protected isStopped: boolean = false;
protected destination: Observer<any>;

constructor(destination: Observer<any> = emptyObserver) {
constructor(destinationOrNext?: Observer<any> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();

this.destination = destination;

if (!destination ||
(destination instanceof Subscriber) ||
(destination === emptyObserver)) {
return;
switch (arguments.length) {
case 0:
this.destination = emptyObserver;
break;
case 1:
if (!destinationOrNext) {
this.destination = emptyObserver;
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.destination = (<Observer<any>> destinationOrNext);
} else {
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
}
break;
}
default:
this.destination = new SafeSubscriber<T>(this, <((value: T) => void)> destinationOrNext, error, complete);
break;
}

if (typeof destination.next !== 'function') { destination.next = noop; }
if (typeof destination.error !== 'function') { destination.error = throwError; }
if (typeof destination.complete !== 'function') { destination.complete = noop; }
}

next(value?: T): void {
Expand Down Expand Up @@ -83,25 +95,48 @@ export class Subscriber<T> extends Subscription implements Observer<T> {

class SafeSubscriber<T> extends Subscriber<T> {

constructor(next?: (x?: T) => void,
private _context: any;

constructor(private _parent: Subscriber<T>,
observerOrNext?: Observer<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
this._next = (typeof next === 'function') && tryOrThrowError(next) || null;
this._error = (typeof error === 'function') && tryOrThrowError(error) || throwError;
this._complete = (typeof complete === 'function') && tryOrThrowError(complete) || null;

let next: ((value: T) => void);
let context: any = this;

if (isFunction(observerOrNext)) {
next = (<((value: T) => void)> observerOrNext);
} else if (observerOrNext) {
context = observerOrNext;
next = (<Observer<T>> observerOrNext).next;
error = (<Observer<T>> observerOrNext).error;
complete = (<Observer<T>> observerOrNext).complete;
}

this._context = context;
this._next = next;
this._error = error;
this._complete = complete;
}

next(value?: T): void {
if (!this.isStopped && this._next) {
this._next(value);
if (tryCatch(this._next).call(this._context, value) === errorObject) {
this.unsubscribe();
throw errorObject.e;
}
}
}

error(err?: any): void {
if (!this.isStopped) {
if (this._error) {
this._error(err);
if (tryCatch(this._error).call(this._context, err) === errorObject) {
this.unsubscribe();
throw errorObject.e;
}
}
this.unsubscribe();
}
Expand All @@ -110,9 +145,19 @@ class SafeSubscriber<T> extends Subscriber<T> {
complete(): void {
if (!this.isStopped) {
if (this._complete) {
this._complete();
if (tryCatch(this._complete).call(this._context) === errorObject) {
this.unsubscribe();
throw errorObject.e;
}
}
this.unsubscribe();
}
}

protected _unsubscribe(): void {
const { _parent } = this;
this._context = null;
this._parent = null;
_parent.unsubscribe();
}
}
4 changes: 1 addition & 3 deletions src/util/toSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ export function toSubscriber<T>(
return (<Subscriber<T>> next);
} else if (typeof next[rxSubscriber] === 'function') {
return next[rxSubscriber]();
} else {
return new Subscriber(<Observer<T>> next);
}
}

return Subscriber.create(<((value: T) => void)> next, error, complete);
return new Subscriber(next, error, complete);
}
11 changes: 0 additions & 11 deletions src/util/tryOrThrowError.ts

This file was deleted.

0 comments on commit dc67d21

Please sign in to comment.