Skip to content

Commit

Permalink
feat(combineLatest): supports promises, iterables, lowercase-o observ…
Browse files Browse the repository at this point in the history
…ables and Observables

- refactor combineLatest to standalone from zip
- moves tests for combineAll to own file
- moves test regarding immediate scheduling to static combineLatest tests
- adds support for iterables, promises, Observables and lowercase-o observables
- adds scheduling capability to static combineLatest method
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent 4c16aa6 commit ce76e4e
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 178 deletions.
16 changes: 15 additions & 1 deletion spec/observables/combineLatest-spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* globals describe, it, expect, expectObservable, hot */
/* globals describe, it, expect, hot, cold, expectObservable */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

describe('Observable.combineLatest', function () {
it('should combineLatest the provided observables', function () {
Expand All @@ -14,4 +15,17 @@ describe('Observable.combineLatest', function () {

expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'});
});

it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
Observable.combineLatest(a, b, immediateScheduler).subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
}, null, function() {
expect(i).toEqual(r.length);
done();
});
});
});
31 changes: 31 additions & 0 deletions spec/operators/combineAll-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/* globals describe, it, expect, hot, cold, expectObservable */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

describe('Observable.prototype.combineAll()', function(){
it("should combine two observables", function (done) {
var a = Observable.of(1, 2, 3);
var b = Observable.of(4, 5, 6, 7, 8);
var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
Observable.of(a, b).combineAll().subscribe(function (vals) {
expect(vals).toEqual(expected.shift());
}, null, function() {
expect(expected.length).toBe(0);
done();
});
});

it("should combine two immediately-scheduled observables", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
}, null, function() {
expect(i).toEqual(r.length);
done();
});
});
});
46 changes: 3 additions & 43 deletions spec/operators/combineLatest-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,14 @@ var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

describe('Observable.prototype.combineLatest', function () {
it("should combine two observables", function (done) {
var a = Observable.of(1, 2, 3);
var b = Observable.of(4, 5, 6, 7, 8);
var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
Observable.of(a, b).combineAll().subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
}, null, function() {
expect(i).toEqual(r.length);
done();
});
});

it("should combine a source with a second", function (done) {
var a = Observable.of(1, 2, 3);
var b = Observable.of(4, 5, 6, 7, 8);
var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
a.combineLatest(b).subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
}, null, function() {
expect(i).toEqual(r.length);
done();
});
});

it("should combine two immediately-scheduled observables", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
}, null, function() {
expect(i).toEqual(r.length);
done();
});
});

it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler);
var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]];
var i = 0;
var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]];
a.combineLatest(b).subscribe(function (vals) {
expect(vals).toDeepEqual(r[i++]);
expect(vals).toEqual(expected.shift());
}, null, function() {
expect(i).toEqual(r.length);
expect(expected.length).toEqual(0);
done();
});
});
Expand Down
17 changes: 12 additions & 5 deletions src/operators/combineLatest-static.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import Observable from '../Observable';
import ArrayObservable from '../observables/ArrayObservable';
import { CombineLatestOperator } from './combineLatest-support';
import Scheduler from '../Scheduler';

export default function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R))[]): Observable<R> {
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
if (typeof project === "function") {
observables.pop();
export default function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R) | Scheduler)[]): Observable<R> {
let project, scheduler;

if(typeof (<any>observables[observables.length - 1]).schedule === 'function') {
scheduler = observables.pop();
}
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));

if (typeof observables[observables.length - 1] === 'function') {
project = observables.pop();
}

return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project));
}
107 changes: 54 additions & 53 deletions src/operators/combineLatest-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,11 @@ import Subscriber from '../Subscriber';

import ArrayObservable from '../observables/ArrayObservable';
import EmptyObservable from '../observables/EmptyObservable';
import {ZipSubscriber, ZipInnerSubscriber} from './zip-support';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export function combineLatest<T, R>(...observables: (Observable<any> | ((...values: Array<any>) => R))[]): Observable<R> {
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
if (typeof project === "function") {
observables.pop();
}
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
}

export function combineLatestProto<R>(...observables: (Observable<any>|((...values: any[]) => R))[]): Observable<R> {
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
if (typeof project === "function") {
observables.pop();
}
observables.unshift(this);
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
}
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

export class CombineLatestOperator<T, R> implements Operator<T, R> {

Expand All @@ -40,51 +24,68 @@ export class CombineLatestOperator<T, R> implements Operator<T, R> {
}
}

export class CombineLatestSubscriber<T, R> extends ZipSubscriber<T, R> {

project: (...values: Array<any>) => R;
limit: number = 0;

constructor(destination: Subscriber<R>, project?: (...values: Array<any>) => R) {
super(destination, project, []);
export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private values: any[] = [];
private observables: any[] = [];
private toRespond: number[] = [];

constructor(destination: Subscriber<R>, private project?: (...values: Array<any>) => R) {
super(destination);
}

_subscribeInner(observable, values, index, total) {
return observable._subscribe(new CombineLatestInnerSubscriber(this.destination, this, values, index, total));
_next(observable: any) {
const toRespond = this.toRespond;
toRespond.push(toRespond.length);
this.observables.push(observable);
}

_innerComplete(innerSubscriber) {
if((this.active -= 1) === 0) {

_complete() {
const observables = this.observables;
const len = observables.length;
if(len === 0) {
this.destination.complete();
} else {
this.active = len;
for(let i = 0; i < len; i++) {
let observable = observables[i];
this.add(subscribeToResult(this, observable, observable, i));
}
}
}
}

export class CombineLatestInnerSubscriber<T, R> extends ZipInnerSubscriber<T, R> {

constructor(destination: Observer<T>, parent: ZipSubscriber<T, R>, values: any, index : number, total : number) {
super(destination, parent, values, index, total);
notifyComplete(innerSubscriber) {
if((this.active -= 1) === 0) {
this.destination.complete();
}
}

_next(x) {

const index = this.index;
const total = this.total;
const parent = this.parent;

notifyNext(value: R, observable: any, innerIndex: number, outerIndex: number) {
const values = this.values;
const valueBox = values[index];
let limit;

if(valueBox) {
valueBox[0] = x;
limit = parent.limit;
} else {
limit = parent.limit += 1;
values[index] = [x];
values[outerIndex] = value;
const toRespond = this.toRespond;

if(toRespond.length > 0) {
const found = toRespond.indexOf(outerIndex);
if(found !== -1) {
toRespond.splice(found, 1);
}
}

if(limit >= total) {
this._projectNext(values, parent.project);

if(toRespond.length === 0) {
const project = this.project;
const destination = this.destination;

if(project) {
let result = tryCatch(project).apply(this, values);
if(result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(values);
}
}
}
}
8 changes: 4 additions & 4 deletions src/operators/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import ArrayObservable from '../observables/ArrayObservable';
import { CombineLatestOperator } from './combineLatest-support';

export default function combineLatest<R>(...observables: (Observable<any>|((...values: any[]) => R))[]): Observable<R> {
const project = <((...ys: Array<any>) => R)> observables[observables.length - 1];
if (typeof project === "function") {
observables.pop();
}
observables.unshift(this);
let project;
if (typeof observables[observables.length - 1] === "function") {
project = observables.pop();
}
return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
}
Loading

0 comments on commit ce76e4e

Please sign in to comment.