Skip to content

Commit

Permalink
fix(do): fix do operator to invoke observer message handlers in the r…
Browse files Browse the repository at this point in the history
…ight context.
  • Loading branch information
trxcllnt committed Feb 10, 2016
1 parent 4c1ac01 commit 67a2f25
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 53 deletions.
27 changes: 27 additions & 0 deletions spec/operators/do-spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Subject = Rx.Subject;
var Observable = Rx.Observable;

describe('Observable.prototype.do()', function () {
Expand Down Expand Up @@ -57,6 +58,32 @@ describe('Observable.prototype.do()', function () {
expect(results).toEqual(expected);
});

it('should handle everything with a Subject', function () {
var expected = [1,2,3];
var results = [];
var completeCalled = false;
var subject = new Subject();

subject.subscribe({
next: function (x) {
results.push(x);
},
error: function (err) {
throw 'should not be called';
},
complete: function () {
completeCalled = true;
}
});

Observable.of(1,2,3)
.do(subject)
.subscribe();

expect(completeCalled).toBe(true);
expect(results).toEqual(expected);
});

it('should handle an error with a callback', function () {
var errored = false;
Observable.throw('bad').do(null, function (err) {
Expand Down
93 changes: 40 additions & 53 deletions src/operator/do.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {Subscriber} from '../Subscriber';

import {noop} from '../util/noop';
import {Observable} from '../Observable';
import {PartialObserver} from '../Observer';

/**
* Returns a mirrored Observable of the source Observable, but modified so that the provided Observer is called
Expand All @@ -14,77 +12,66 @@ import {Observable} from '../Observable';
* @param {function} [complete] callback for the completion of the source.
* @reurns {Observable} a mirrored Observable with the specified Observer or callback attached for each item.
*/
export function _do<T>(nextOrObserver?: Observer<T> | ((x: T) => void), error?: (e: any) => void, complete?: () => void): Observable<T> {
let next: (x: T) => void;
if (nextOrObserver && typeof nextOrObserver === 'object') {
next = (<Observer<T>>nextOrObserver).next;
error = (<Observer<T>>nextOrObserver).error;
complete = (<Observer<T>>nextOrObserver).complete;
} else {
next = <(x: T) => void>nextOrObserver;
}
return this.lift(new DoOperator(next || noop, error || noop, complete || noop));
export function _do<T>(nextOrObserver?: PartialObserver<T> | ((x: T) => void),
error?: (e: any) => void,
complete?: () => void): Observable<T> {
return this.lift(new DoOperator(nextOrObserver, error, complete));
}

class DoOperator<T> implements Operator<T, T> {

next: (x: T) => void;
error: (e: any) => void;
complete: () => void;

constructor(next: (x: T) => void, error: (e: any) => void, complete: () => void) {
this.next = next;
this.error = error;
this.complete = complete;
constructor(private nextOrObserver?: PartialObserver<T> | ((x: T) => void),
private error?: (e: any) => void,
private complete?: () => void) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new DoSubscriber(subscriber, this.next, this.error, this.complete);
return new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete);
}
}

class DoSubscriber<T> extends Subscriber<T> {

private __next: (x: T) => void;
private __error: (e: any) => void;
private __complete: () => void;
private safeSubscriber: Subscriber<T>;

constructor(destination: Subscriber<T>, next: (x: T) => void, error: (e: any) => void, complete: () => void) {
constructor(destination: Subscriber<T>,
nextOrObserver?: PartialObserver<T> | ((x: T) => void),
error?: (e: any) => void,
complete?: () => void) {
super(destination);
this.__next = next;
this.__error = error;
this.__complete = complete;

const safeSubscriber = new Subscriber<T>(nextOrObserver, error, complete);
safeSubscriber.syncErrorThrowable = true;
this.add(safeSubscriber);
this.safeSubscriber = safeSubscriber;
}

// NOTE: important, all try catch blocks below are there for performance
// reasons. tryCatcher approach does not benefit this operator.
protected _next(value: T) {
try {
this.__next(value);
} catch (err) {
this.destination.error(err);
return;
protected _next(value: T): void {
const { safeSubscriber } = this;
safeSubscriber.next(value);
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.next(value);
}
this.destination.next(value);
}

protected _error(err: any) {
try {
this.__error(err);
} catch (err) {
protected _error(err: any): void {
const { safeSubscriber } = this;
safeSubscriber.error(err);
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.error(err);
return;
}
this.destination.error(err);
}

protected _complete() {
try {
this.__complete();
} catch (err) {
this.destination.error(err);
return;
protected _complete(): void {
const { safeSubscriber } = this;
safeSubscriber.complete();
if (safeSubscriber.syncErrorThrown) {
this.destination.error(safeSubscriber.syncErrorValue);
} else {
this.destination.complete();
}
this.destination.complete();
}
}

0 comments on commit 67a2f25

Please sign in to comment.