From 67a2f254a2957ed721a1b9e6919c9ee9eeb6437e Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Wed, 10 Feb 2016 14:47:52 -0800 Subject: [PATCH] fix(do): fix do operator to invoke observer message handlers in the right context. --- spec/operators/do-spec.js | 27 ++++++++++++ src/operator/do.ts | 93 +++++++++++++++++---------------------- 2 files changed, 67 insertions(+), 53 deletions(-) diff --git a/spec/operators/do-spec.js b/spec/operators/do-spec.js index 5edbeab23f..84e85f6c97 100644 --- a/spec/operators/do-spec.js +++ b/spec/operators/do-spec.js @@ -1,5 +1,6 @@ /* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */ var Rx = require('../../dist/cjs/Rx'); +var Subject = Rx.Subject; var Observable = Rx.Observable; describe('Observable.prototype.do()', function () { @@ -57,6 +58,32 @@ describe('Observable.prototype.do()', function () { expect(results).toEqual(expected); }); + it('should handle everything with a Subject', function () { + var expected = [1,2,3]; + var results = []; + var completeCalled = false; + var subject = new Subject(); + + subject.subscribe({ + next: function (x) { + results.push(x); + }, + error: function (err) { + throw 'should not be called'; + }, + complete: function () { + completeCalled = true; + } + }); + + Observable.of(1,2,3) + .do(subject) + .subscribe(); + + expect(completeCalled).toBe(true); + expect(results).toEqual(expected); + }); + it('should handle an error with a callback', function () { var errored = false; Observable.throw('bad').do(null, function (err) { diff --git a/src/operator/do.ts b/src/operator/do.ts index 73e1ac47f4..f06a09c705 100644 --- a/src/operator/do.ts +++ b/src/operator/do.ts @@ -1,9 +1,7 @@ import {Operator} from '../Operator'; -import {Observer} from '../Observer'; import {Subscriber} from '../Subscriber'; - -import {noop} from '../util/noop'; import {Observable} from '../Observable'; +import {PartialObserver} from '../Observer'; /** * Returns a mirrored Observable of the source Observable, but modified so that the provided Observer is called @@ -14,77 +12,66 @@ import {Observable} from '../Observable'; * @param {function} [complete] callback for the completion of the source. * @reurns {Observable} a mirrored Observable with the specified Observer or callback attached for each item. */ -export function _do(nextOrObserver?: Observer | ((x: T) => void), error?: (e: any) => void, complete?: () => void): Observable { - let next: (x: T) => void; - if (nextOrObserver && typeof nextOrObserver === 'object') { - next = (>nextOrObserver).next; - error = (>nextOrObserver).error; - complete = (>nextOrObserver).complete; - } else { - next = <(x: T) => void>nextOrObserver; - } - return this.lift(new DoOperator(next || noop, error || noop, complete || noop)); +export function _do(nextOrObserver?: PartialObserver | ((x: T) => void), + error?: (e: any) => void, + complete?: () => void): Observable { + return this.lift(new DoOperator(nextOrObserver, error, complete)); } class DoOperator implements Operator { - - next: (x: T) => void; - error: (e: any) => void; - complete: () => void; - - constructor(next: (x: T) => void, error: (e: any) => void, complete: () => void) { - this.next = next; - this.error = error; - this.complete = complete; + constructor(private nextOrObserver?: PartialObserver | ((x: T) => void), + private error?: (e: any) => void, + private complete?: () => void) { } - call(subscriber: Subscriber): Subscriber { - return new DoSubscriber(subscriber, this.next, this.error, this.complete); + return new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete); } } class DoSubscriber extends Subscriber { - private __next: (x: T) => void; - private __error: (e: any) => void; - private __complete: () => void; + private safeSubscriber: Subscriber; - constructor(destination: Subscriber, next: (x: T) => void, error: (e: any) => void, complete: () => void) { + constructor(destination: Subscriber, + nextOrObserver?: PartialObserver | ((x: T) => void), + error?: (e: any) => void, + complete?: () => void) { super(destination); - this.__next = next; - this.__error = error; - this.__complete = complete; + + const safeSubscriber = new Subscriber(nextOrObserver, error, complete); + safeSubscriber.syncErrorThrowable = true; + this.add(safeSubscriber); + this.safeSubscriber = safeSubscriber; } - // NOTE: important, all try catch blocks below are there for performance - // reasons. tryCatcher approach does not benefit this operator. - protected _next(value: T) { - try { - this.__next(value); - } catch (err) { - this.destination.error(err); - return; + protected _next(value: T): void { + const { safeSubscriber } = this; + safeSubscriber.next(value); + if (safeSubscriber.syncErrorThrown) { + this.destination.error(safeSubscriber.syncErrorValue); + } else { + this.destination.next(value); } - this.destination.next(value); } - protected _error(err: any) { - try { - this.__error(err); - } catch (err) { + protected _error(err: any): void { + const { safeSubscriber } = this; + safeSubscriber.error(err); + if (safeSubscriber.syncErrorThrown) { + this.destination.error(safeSubscriber.syncErrorValue); + } else { this.destination.error(err); - return; } - this.destination.error(err); } - protected _complete() { - try { - this.__complete(); - } catch (err) { - this.destination.error(err); - return; + protected _complete(): void { + const { safeSubscriber } = this; + safeSubscriber.complete(); + if (safeSubscriber.syncErrorThrown) { + this.destination.error(safeSubscriber.syncErrorValue); + } else { + this.destination.complete(); } - this.destination.complete(); } } +