Skip to content

Commit

Permalink
feat(onUnhandledError): configuration point added for unhandled errors (
Browse files Browse the repository at this point in the history
#5681)

- Adds new configuration setting `onUnhandledError`, which defaults to using "hostReportError" behavior.
- Adds tests that ensure it is called appropriately, and that it is always asynchronous.
- Updates internal name of empty observer to be `EMPTY_OBSERVER` throughout and types it to prevent mutations. Reduces overhead by using the `noop` function for its callbacks.
- Errors that occur during subscription setup _after_ the subscription was already closed will no longer log to `console.warn`

BREAKING CHANGE: Errors that occur during setup of an observable subscription after the subscription has emitted an error or completed will now throw in their own call stack. Before it would call `console.warn`. This is potentially breaking in edge cases for node applications, which may be configured to terminate for unhandled exceptions. In the unlikely event this affects you, you can configure the behavior to `console.warn` in the new configuration setting like so: `import { config } from 'rxjs'; config.onUnhandledError = (err) => console.warn(err);`
  • Loading branch information
benlesh authored Sep 8, 2020
1 parent 795ea6a commit 3485dd5
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 48 deletions.
1 change: 1 addition & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ export declare function concat<O1 extends ObservableInput<any>, O2 extends Obser
export declare function concat<A extends ObservableInput<any>[]>(...observables: A): Observable<ObservedValueUnionFromArray<A>>;

export declare const config: {
onUnhandledError: ((err: any) => void) | null;
quietBadConfig: boolean;
Promise: PromiseConstructorLike;
useDeprecatedSynchronousErrorHandling: boolean;
Expand Down
26 changes: 14 additions & 12 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -991,19 +991,21 @@ describe('Observable.lift', () => {
);
});

it('should not swallow internal errors', () => {
const consoleStub = sinon.stub(console, 'warn');
try {
let source = new Observable<number>((observer) => observer.next(42));
for (let i = 0; i < 10000; ++i) {
let base = source;
source = new Observable<number>((observer) => base.subscribe(observer));
it('should not swallow internal errors', (done) => {
config.onUnhandledError = (err) => {
expect(err).to.equal('bad');
config.onUnhandledError = null;
done();
};

new Observable(subscriber => {
subscriber.error('test');
throw 'bad';
}).subscribe({
error: err => {
expect(err).to.equal('test');
}
source.subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
}
});
});

// TODO: Stop skipping this until a later refactor (probably in version 8)
Expand Down
159 changes: 159 additions & 0 deletions spec/config-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,168 @@
/** @prettier */

import { config } from '../src/internal/config';
import { expect } from 'chai';
import { Observable } from 'rxjs';

describe('config', () => {
it('should have a Promise property that defaults to nothing', () => {
expect(config).to.have.property('Promise');
expect(config.Promise).to.be.undefined;
});

describe('onUnhandledError', () => {
afterEach(() => {
config.onUnhandledError = null;
});

it('should default to null', () => {
expect(config.onUnhandledError).to.be.null;
});

it('should call asynchronously if an error is emitted and not handled by the consumer observer', (done) => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe({
next: value => results.push(value),
});
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
});

it('should call asynchronously if an error is emitted and not handled by the consumer next callback', (done) => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe(value => results.push(value));
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
});

it('should call asynchronously if an error is emitted and not handled by the consumer in the empty case', (done) => {
let called = false;
config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.error('bad');
});

source.subscribe();
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by an error', (done) => {
let called = false;
config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.error('handled');
throw 'bad';
});

let syncSentError: any;
source.subscribe({
error: err => {
syncSentError = err;
}
});

expect(syncSentError).to.equal('handled');
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by a completion', (done) => {
let called = false;
let completed = false;

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.complete();
throw 'bad';
});

source.subscribe({
error: () => {
throw 'should not be called';
},
complete: () => {
completed = true;
}
});

expect(completed).to.be.true;
expect(called).to.be.false;
});

/**
* Thie test is added so people know this behavior is _intentional_. It's part of the contract of observables
* and, while I'm not sure I like it, it might start surfacing untold numbers of errors, and break
* node applications if we suddenly changed this to start throwing errors on other jobs for instances
* where users accidentally called `subscriber.error` twice. Likewise, would we report an error
* for two calls of `complete`? This is really something a build-time tool like a linter should
* capture. Not a run time error reporting event.
*/
it('should not be called if two errors are sent to the subscriber', (done) => {
let called = false;
config.onUnhandledError = () => {
called = true;
};

const source = new Observable(subscriber => {
subscriber.error('handled');
subscriber.error('swallowed');
});

let syncSentError: any;
source.subscribe({
error: err => {
syncSentError = err;
}
});

expect(syncSentError).to.equal('handled');
// This timeout would be scheduled _after_ any error timeout that might be scheduled
// (But we're not scheduling that), so this is just an artificial delay to make sure the
// behavior sticks.
setTimeout(() => {
expect(called).to.be.false;
done();
});
});
});
});
22 changes: 22 additions & 0 deletions src/internal/EMPTY_OBSERVER.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Observer } from './types';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';

/**
* The observer used as a stub for subscriptions where the user did not
* pass any arguments to `subscribe`. Comes with the default error handling
* behavior.
*/
export const EMPTY_OBSERVER: Readonly<Observer<any>> = {
closed: true,
next: noop,
error(err: any): void {
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
reportUnhandledError(err);
}
},
complete: noop
};
5 changes: 3 additions & 2 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { toSubscriber } from './util/toSubscriber';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -253,8 +254,8 @@ export class Observable<T> implements Subscribable<T> {
sink.error(err);
} else {
// If an error is thrown during subscribe, but our subscriber is closed, so we cannot notify via the
// subscription "error" channel, we are warning the developer of the problem here, via the console.
console.warn(err);
// subscription "error" channel, it is an unhandled error and we need to report it appropriately.
reportUnhandledError(err);
}
}
}
Expand Down
16 changes: 0 additions & 16 deletions src/internal/Observer.ts

This file was deleted.

12 changes: 6 additions & 6 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { isFunction } from './util/isFunction';
import { empty as emptyObserver } from './Observer';
import { EMPTY_OBSERVER } from './EMPTY_OBSERVER';
import { Observer, PartialObserver } from './types';
import { Subscription, isSubscription } from './Subscription';
import { config } from './config';
import { hostReportError } from './util/hostReportError';
import { reportUnhandledError } from './util/reportUnhandledError';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -58,11 +58,11 @@ export class Subscriber<T> extends Subscription implements Observer<T> {

switch (arguments.length) {
case 0:
this.destination = emptyObserver;
this.destination = EMPTY_OBSERVER;
break;
case 1:
if (!destinationOrNext) {
this.destination = emptyObserver;
this.destination = EMPTY_OBSERVER;
break;
}
if (typeof destinationOrNext === 'object') {
Expand Down Expand Up @@ -165,7 +165,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
next = observerOrNext.next;
error = observerOrNext.error;
complete = observerOrNext.complete;
if (observerOrNext !== emptyObserver) {
if (observerOrNext !== EMPTY_OBSERVER) {
let context: any;
if (config.useDeprecatedNextContext) {
context = Object.create(observerOrNext);
Expand Down Expand Up @@ -224,7 +224,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
throw err;
}
} else {
hostReportError(err);
reportUnhandledError(err);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/internal/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ let _enable_deoptimized_subscriber_creation = false;
* like what Promise contructor should used to create Promises
*/
export const config = {
/**
* A registration point for unhandled errors from RxJS. These are errors that
* cannot were not handled by consuming code in the usual subscription path. For
* example, if you have this configured, and you subscribe to an observable without
* providing an error handler, errors from that subscription will end up here. This
* will _always_ be called asynchronously on another job in the runtime. This is because
* we do not want errors thrown in this user-configured handler to interfere with the
* behavior of the library.
*/
onUnhandledError: null as ((err: any) => void) | null,

/**
* If true, console logs for deprecation warnings will not be emitted.
* @deprecated this will be removed in v8 when all deprecated settings are removed.
Expand Down
8 changes: 0 additions & 8 deletions src/internal/util/hostReportError.ts

This file was deleted.

24 changes: 24 additions & 0 deletions src/internal/util/reportUnhandledError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/** @prettier */
import { config } from '../config';

/**
* Handles an error on another job either with the user-configured {@link onUnhandledError},
* or by throwing it on that new job so it can be picked up by `window.onerror`, `process.on('error')`, etc.
*
* This should be called whenever there is an error that is out-of-band with the subscription
* or when an error hits a terminal boundary of the subscription and no error handler was provided.
*
* @param err the error to report
*/
export function reportUnhandledError(err: any) {
setTimeout(() => {
const { onUnhandledError } = config;
if (onUnhandledError) {
// Execute the user-configured error handler.
onUnhandledError(err);
} else {
// Throw so it is picked up by the runtime's uncaught error mechanism.
throw err;
}
});
}
4 changes: 2 additions & 2 deletions src/internal/util/subscribeToPromise.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subscriber } from '../Subscriber';
import { hostReportError } from './hostReportError';
import { reportUnhandledError } from './reportUnhandledError';

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => {
promise.then(
Expand All @@ -11,6 +11,6 @@ export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: S
},
(err: any) => subscriber.error(err)
)
.then(null, hostReportError);
.then(null, reportUnhandledError);
return subscriber;
};
4 changes: 2 additions & 2 deletions src/internal/util/toSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { empty as emptyObserver } from '../Observer';
import { EMPTY_OBSERVER } from '../EMPTY_OBSERVER';
import { PartialObserver, Observer } from '../types';
import { isSubscription } from '../Subscription';

Expand All @@ -20,7 +20,7 @@ export function toSubscriber<T>(
}

if (!nextOrObserver && !error && !complete) {
return new Subscriber(emptyObserver);
return new Subscriber(EMPTY_OBSERVER);
}

return new Subscriber(nextOrObserver, error, complete);
Expand Down

0 comments on commit 3485dd5

Please sign in to comment.