diff --git a/spec/Observable-spec.js b/spec/Observable-spec.js index f13cdd4e44..a4d83bf068 100644 --- a/spec/Observable-spec.js +++ b/spec/Observable-spec.js @@ -72,6 +72,21 @@ describe('Observable', function () { expect(typeof result.then).toBe('function'); }); + + it('should reject promise if nextHandler throws', function (done) { + var results = []; + Observable.of(1,2,3).forEach(function (x) { + if (x === 3) { + throw new Error('NO THREES!'); + }; + results.push(x); + }) + .then(done.fail, function (err) { + expect(err).toEqual(new Error('NO THREES!')); + expect(results).toEqual([1,2]); + }) + .then(done); + }); }); describe('subscribe', function () { diff --git a/src/Observable.ts b/src/Observable.ts index 6440bdd436..e020659644 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -11,6 +11,8 @@ import {ConnectableObservable} from './observable/ConnectableObservable'; import {Subject} from './Subject'; import {Notification} from './Notification'; import {toSubscriber} from './util/toSubscriber'; +import {tryCatch} from './util/tryCatch'; +import {errorObject} from './util/errorObject'; import {combineLatest as combineLatestStatic} from './operator/combineLatest-static'; import {concat as concatStatic} from './operator/concat-static'; @@ -136,27 +138,16 @@ export class Observable implements CoreOperators { throw new Error('no Promise impl found'); } - let nextHandler: any; + const source = this; - if (thisArg) { - nextHandler = function nextHandlerFn(value: any): void { - const { thisArg, next } = nextHandlerFn; - return next.call(thisArg, value); - }; - nextHandler.thisArg = thisArg; - nextHandler.next = next; - } else { - nextHandler = next; - } - - const promiseCallback = function promiseCallbackFn(resolve: Function, reject: Function) { - const { source, nextHandler } = promiseCallbackFn; - source.subscribe(nextHandler, reject, resolve); - }; - (promiseCallback).source = this; - (promiseCallback).nextHandler = nextHandler; - - return new PromiseCtor(promiseCallback); + return new PromiseCtor((resolve, reject) => { + source.subscribe((value: T) => { + const result: any = tryCatch(next).call(thisArg, value); + if (result === errorObject) { + reject(errorObject.e); + } + }, reject, resolve); + }); } _subscribe(subscriber: Subscriber): Subscription | Function | void {