Skip to content

Commit

Permalink
refactor(forkJoinObservable): use subscribeToResult for internal beha…
Browse files Browse the repository at this point in the history
…vior

relates to ReactiveX#1483
  • Loading branch information
kwonoj committed Mar 25, 2016
1 parent e66b2d8 commit 5e53b11
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 47 deletions.
11 changes: 11 additions & 0 deletions spec/observables/forkJoin-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ describe('Observable.forkJoin', () => {
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
});

it('should accept empty lowercase-o observables', () => {
const e1 = Observable.forkJoin(
hot('--a--b--c--d--|'),
hot('(b|)'),
lowerCaseO()
);
const expected = '|';

expectObservable(e1).toBe(expected);
});

it('should accept promise', (done: DoneSignature) => {
const e1 = Observable.forkJoin(
Observable.of(1),
Expand Down
111 changes: 64 additions & 47 deletions src/observable/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {PromiseObservable} from './PromiseObservable';
import {Subscription} from '../Subscription';
import {EmptyObservable} from './EmptyObservable';
import {isPromise} from '../util/isPromise';
import {isArray} from '../util/isArray';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';

/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class ForkJoinObservable<T> extends Observable<T> {
constructor(private sources: Array<Observable<any> | Promise<any>>,
constructor(private sources: Array<SubscribableOrPromise<any>>,
private resultSelector?: (...values: Array<any>) => T) {
super();
}
Expand All @@ -23,8 +26,8 @@ export class ForkJoinObservable<T> extends Observable<T> {
* @name forkJoin
* @owner Observable
*/
static create<T>(...sources: Array<Observable<any> | Promise<any> |
Array<Observable<any>> |
static create<T>(...sources: Array<SubscribableOrPromise<any> |
Array<SubscribableOrPromise<any>> |
((...values: Array<any>) => any)>): Observable<T> {
if (sources === null || arguments.length === 0) {
return new EmptyObservable<T>();
Expand All @@ -38,83 +41,97 @@ export class ForkJoinObservable<T> extends Observable<T> {
// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)`
if (sources.length === 1 && isArray(sources[0])) {
sources = <Array<Observable<any>>>sources[0];
sources = <Array<SubscribableOrPromise<any>>>sources[0];
}

if (sources.length === 0) {
return new EmptyObservable<T>();
}

return new ForkJoinObservable(<Array<Observable<any> | Promise<any>>>sources, resultSelector);
return new ForkJoinObservable(<Array<SubscribableOrPromise<any>>>sources, resultSelector);
}

protected _subscribe(subscriber: Subscriber<any>) {
const sources = this.sources;
const len = sources.length;
protected _subscribe(subscriber: Subscriber<any>): Subscription {
return new ForkJoinSubscriber(subscriber, this.sources, this.resultSelector);
}
}

const context = { completed: 0,
interface ForkJoinContext {
completed: number;
total: number;
values: Array<any>;
haveValues: Array<boolean>;
selector: Function;
}

class ForkJoinSubscriber<T> extends OuterSubscriber<T, T> {
private context: ForkJoinContext = null;

constructor(destination: Subscriber<T>,
private sources: Array<SubscribableOrPromise<any>>,
resultSelector?: (...values: Array<any>) => T) {
super(destination);

const len = sources.length;
this.context = { completed: 0,
total: len,
values: new Array(len),
haveValues: new Array(len),
selector: this.resultSelector };
selector: resultSelector };

for (let i = 0; i < len; i++) {
let source = sources[i];
if (isPromise(source)) {
source = new PromiseObservable(<Promise<any>>source);
}
subscriber.add((<Observable<any>>source)
.subscribe(new AllSubscriber(subscriber, i, context)));
}
this.tryForkJoin();
}
}

class AllSubscriber<T> extends Subscriber<T> {
notifyNext(outerValue: any, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
const context = this.context;

constructor(destination: Subscriber<any>,
private index: number,
private context: { completed: number,
total: number,
values: any[],
haveValues: any[],
selector: (...values: Array<any>) => any }) {
super(destination);
context.values[outerIndex] = innerValue;
context.haveValues[outerIndex] = true;
}

protected _next(value: T): void {
const context = this.context;
const index = this.index;

context.values[index] = value;
context.haveValues[index] = true;
notifyComplete(innerSub: InnerSubscriber<T, T>): void {
const outerIndex = (<any>innerSub).outerIndex;
this.tryComplete(outerIndex);
}

protected _complete(): void {
private tryComplete(index: number): void {
const destination = this.destination;
const context = this.context;

if (!context.haveValues[this.index]) {
context.completed++;

if (!context.haveValues[index]) {
destination.complete();
}

context.completed++;

const values = context.values;

if (context.completed !== values.length) {
return;
}

if (context.haveValues.every(hasValue)) {
if (context.haveValues.every(x => x === true)) {
const value = context.selector ? context.selector.apply(this, values) :
values;
destination.next(value);
}

destination.complete();
}
}

function hasValue(x: any): boolean {
return x === true;
}
private tryForkJoin(): void {
const sources = this.sources;
const len = sources.length;

for (let i = 0; i < len; i++) {
const source = sources[i];
const innerSubscription = subscribeToResult(this, source, null, i);

if (innerSubscription) {
(<any> innerSubscription).outerIndex = i;
this.add(innerSubscription);
}
}
}
}

0 comments on commit 5e53b11

Please sign in to comment.