Skip to content

Commit

Permalink
feat(forkJoin): accept promise, resultselector as parameter of forkJoin
Browse files Browse the repository at this point in the history
closes #507
  • Loading branch information
kwonoj authored and benlesh committed Nov 10, 2015
1 parent 46ca1d7 commit 190f349
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 34 deletions.
161 changes: 151 additions & 10 deletions spec/observables/forkJoin-spec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,155 @@
/* globals describe, it, expect */
/* globals describe, it, expect, lowerCaseO, hot, expectObservable */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.forkJoin', function () {
it('should join the last values of the provided observables into an array', function (done) {
Observable.forkJoin(Observable.of(1, 2, 3, 'a'),
Observable.of('b'),
Observable.of(1, 2, 3, 4, 'c'))
.subscribe(function (x) {
expect(x).toEqual(['a', 'b', 'c']);
}, null, done);
});
});
it('should join the last values of the provided observables into an array', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('--1--2--3--|')
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
});

it('should accept lowercase-o observables', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
lowerCaseO('1', '2', '3')
);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
});

it('should accept promise', function (done) {
var e1 = Observable.forkJoin(
Observable.of(1),
Promise.resolve(2)
);

e1.subscribe(function (x) {
expect(x).toEqual([1,2]);
},
function (err) {
done.fail('should not be called');
},
done);
});

it('forkJoin n-ary parameters empty', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('------------------|')
);
var expected = '------------------|';

expectObservable(e1).toBe(expected);
});

it('forkJoin n-ary parameters empty before end', function () {
var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
hot('---------|')
);
var expected = '---------|';

expectObservable(e1).toBe(expected);
});

it('forkJoin empty empty', function () {
var e1 = Observable.forkJoin(
hot('--------------|'),
hot('---------|')
);
var expected = '---------|';

expectObservable(e1).toBe(expected);
});

it('forkJoin none', function () {
var e1 = Observable.forkJoin();
var expected = '|';

expectObservable(e1).toBe(expected);
});

it('forkJoin empty return', function () {
function selector(x, y) {
return x + y;
}

var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('---------|'),
selector);
var expected = '---------|';

expectObservable(e1).toBe(expected);
});

it('forkJoin return return', function () {
function selector(x, y) {
return x + y;
}

var e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('---2-----|'),
selector);
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: 'd2'});
});

it('forkJoin empty throw', function () {
var e1 = Observable.forkJoin(
hot('------#'),
hot('---------|'));
var expected = '------#';

expectObservable(e1).toBe(expected);
});

it('forkJoin empty throw', function () {
function selector(x, y) {
return x + y;
}

var e1 = Observable.forkJoin(
hot('------#'),
hot('---------|'),
selector);
var expected = '------#';

expectObservable(e1).toBe(expected);
});

it('forkJoin return throw', function () {
var e1 = Observable.forkJoin(
hot('------#'),
hot('---a-----|'));
var expected = '------#';

expectObservable(e1).toBe(expected);
});

it('forkJoin return throw', function () {
function selector(x, y) {
return x + y;
}

var e1 = Observable.forkJoin(
hot('------#'),
hot('-------b-|'),
selector);
var expected = '------#';

expectObservable(e1).toBe(expected);
});
});
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class Observable<T> implements CoreOperators<T> {
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
static forkJoin: <T>(...observables: Observable<any>[]) => Observable<T>;
static forkJoin: (...sources: Array<Observable<any> | Promise<any> | ((...values: Array<any>) => any)>) => Observable<any>;
static from: <T>(iterable: any, scheduler?: Scheduler) => Observable<T>;
static fromArray: <T>(array: T[], scheduler?: Scheduler) => Observable<T>;
static fromEvent: <T>(element: any, eventName: string, selector?: (...args: Array<any>) => T) => Observable<T>;
Expand Down
84 changes: 65 additions & 19 deletions src/observables/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,96 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {PromiseObservable} from './PromiseObservable';
import {EmptyObservable} from './EmptyObservable';
import {isPromise} from '../util/isPromise';

export class ForkJoinObservable<T> extends Observable<T> {
constructor(private observables: Observable<any>[]) {
super();
constructor(private sources: Array<Observable<any> |
Promise<any> |
((...values: Array<any>) => any)>) {
super();
}

static create(...sources: Array<Observable<any> |
Promise<any> |
((...values: Array<any>) => any)>)
: Observable<any> {
if (sources === null || sources.length === 0) {
return new EmptyObservable();
}
return new ForkJoinObservable(sources);
}

static create<R>(...observables: Observable<any>[]): Observable<R> {
return new ForkJoinObservable(observables);
private getResultSelector(): (...values: Array<any>) => any {
const sources = this.sources;

let resultSelector = sources[sources.length - 1];
if (typeof resultSelector !== 'function') {
return null;
}
this.sources.pop();
return <(...values: Array<any>) => any>resultSelector;
}

_subscribe(subscriber: Subscriber<any>) {
const observables = this.observables;
const len = observables.length;
let context = { complete: 0, total: len, values: emptyArray(len) };
let resultSelector = this.getResultSelector();
const sources = this.sources;
const len = sources.length;

const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector };
for (let i = 0; i < len; i++) {
observables[i].subscribe(new AllSubscriber(subscriber, this, i, context));
let source = sources[i];
if (isPromise(source)) {
source = new PromiseObservable(<Promise<any>>source);
}
(<Observable<any>>source).subscribe(new AllSubscriber(subscriber, i, context));
}
}
}

class AllSubscriber<T> extends Subscriber<T> {
private _value: T;
private _value: any = null;

constructor(destination: Subscriber<T>,
private parent: ForkJoinObservable<T>,
constructor(destination: Subscriber<any>,
private index: number,
private context: { complete: number, total: number, values: any[] }) {
private context: { completed: number,
total: number,
values: any[],
selector: (...values: Array<any>) => any }) {
super(destination);
}

_next(value: T) {
_next(value: any): void {
this._value = value;
}

_complete() {
_complete(): void {
const destination = this.destination;

if (this._value == null) {
destination.complete();
}

const context = this.context;
context.completed++;
context.values[this.index] = this._value;
if (context.values.every(hasValue)) {
this.destination.next(context.values);
this.destination.complete();
const values = context.values;

if (context.completed !== values.length) {
return;
}

if (values.every(hasValue)) {
let value = context.selector ? context.selector.apply(this, values) :
values;
destination.next(value);
}

destination.complete();
}
}

function hasValue(x) {
function hasValue(x: any): boolean {
return x !== null;
}

Expand All @@ -54,4 +100,4 @@ function emptyArray(len: number): any[] {
arr.push(null);
}
return arr;
}
}
2 changes: 1 addition & 1 deletion src/observables/PromiseObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class PromiseObservable<T> extends Observable<T> {
return new PromiseObservable(promise, scheduler);
}

constructor(private promise: Promise<T>, public scheduler: Scheduler) {
constructor(private promise: Promise<T>, public scheduler: Scheduler = immediate) {
super();
}

Expand Down
6 changes: 3 additions & 3 deletions src/operators/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

import {tryCatch} from '../util/tryCatch';
import {isPromise} from '../util/isPromise';
import {errorObject} from '../util/errorObject';

export function debounce<T>(durationSelector: (value: T) => Observable<any> | Promise<any>): Observable<T> {
Expand Down Expand Up @@ -41,8 +42,7 @@ class DebounceSubscriber<T> extends Subscriber<T> {
if (debounce === errorObject) {
destination.error(errorObject.e);
} else {
if (typeof debounce.subscribe !== 'function'
&& typeof debounce.then === 'function') {
if (isPromise(debounce)) {
debounce = PromiseObservable.create(debounce);
}

Expand Down Expand Up @@ -102,4 +102,4 @@ class DurationSelectorSubscriber<T> extends Subscriber<T> {
_complete() {
this.debounceNext();
}
}
}
3 changes: 3 additions & 0 deletions src/util/isPromise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function isPromise(value: any): boolean {
return value && typeof value.subscribe !== 'function' && typeof value.then === 'function';
}

0 comments on commit 190f349

Please sign in to comment.