Skip to content

Commit

Permalink
fix(ifObservable): accept promise as source
Browse files Browse the repository at this point in the history
relates to #1483
  • Loading branch information
kwonoj authored and benlesh committed Mar 28, 2016
1 parent 280b985 commit 147166e
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 24 deletions.
71 changes: 70 additions & 1 deletion spec/observables/if-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Rx from '../../dist/cjs/Rx.KitchenSink';
import {expectObservable} from '../helpers/marble-testing';
import {it} from '../helpers/test-helper';
import {DoneSignature} from '../helpers/test-helper';

const Observable = Rx.Observable;

Expand All @@ -25,4 +25,73 @@ describe('Observable.if', () => {

expectObservable(e1).toBe(expected);
});

it('should raise error when conditional throws', () => {
const e1 = Observable.if(() => {
throw 'error';
}, Observable.of('a'));

const expected = '#';

expectObservable(e1).toBe(expected);
});

it('should accept resolved promise as thenSource', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.if(() => true, new Promise((resolve: any) => { resolve(expected); }));

e1.subscribe(x => {
expect(x).toBe(expected);
}, (x) => {
done.fail('should not be called');
}, () => {
done();
});
});

it('should accept resolved promise as elseSource', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.if(() => false,
Observable.of('a'),
new Promise((resolve: any) => { resolve(expected); }));

e1.subscribe(x => {
expect(x).toBe(expected);
done();
}, (x) => {
done.fail('should not be called');
}, () => {
done();
});
});

it('should accept rejected promise as elseSource', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.if(() => false,
Observable.of('a'),
new Promise((resolve: any, reject: any) => { reject(expected); }));

e1.subscribe(x => {
done.fail('should not be called');
}, (x) => {
expect(x).toBe(expected);
done();
}, () => {
done.fail();
});
});

it('should accept rejected promise as thenSource', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.if(() => true, new Promise((resolve: any, reject: any) => { reject(expected); }));

e1.subscribe(x => {
done.fail('should not be called');
}, (x) => {
expect(x).toBe(expected);
done();
}, () => {
done.fail();
});
});
});
58 changes: 35 additions & 23 deletions src/observable/IfObservable.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,59 @@
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class IfObservable<T, R> extends Observable<T> {

static create<T, R>(condition: () => boolean,
thenSource?: Observable<T>,
elseSource?: Observable<R>): Observable<T|R> {
static create<T, R>(condition: () => boolean | void,
thenSource?: SubscribableOrPromise<T> | void,
elseSource?: SubscribableOrPromise<R> | void): Observable<T|R> {
return new IfObservable(condition, thenSource, elseSource);
}

constructor(private condition: () => boolean,
private thenSource?: Observable<T>,
private elseSource?: Observable<R>) {
constructor(private condition: () => boolean | void,
private thenSource?: SubscribableOrPromise<T> | void,
private elseSource?: SubscribableOrPromise<R> | void) {
super();
}

protected _subscribe(subscriber: Subscriber<T|R>): Subscription | Function | void {

const { condition, thenSource, elseSource } = this;

let result: boolean, error: any, errorHappened = false;
return new IfSubscriber(subscriber, condition, thenSource, elseSource);
}
}

try {
result = condition();
} catch (e) {
error = e;
errorHappened = true;
}
class IfSubscriber<T, R> extends OuterSubscriber<T, T> {
constructor(destination: Subscriber<T>,
private condition: () => boolean | void,
private thenSource?: SubscribableOrPromise<T> | void,
private elseSource?: SubscribableOrPromise<R> | void) {
super(destination);
this.tryIf();
}

private tryIf(): void {
const { condition, thenSource, elseSource } = this;

if (errorHappened) {
subscriber.error(error);
} else if (result && thenSource) {
return thenSource.subscribe(subscriber);
} else if (elseSource) {
return elseSource.subscribe(subscriber);
} else {
subscriber.complete();
let result: boolean;
try {
result = <boolean>condition();
const source = result ? thenSource : elseSource;

if (source) {
this.add(subscribeToResult(this, source));
} else {
this._complete();
}
} catch (err) {
this._error(err);
}
}
}

0 comments on commit 147166e

Please sign in to comment.