diff --git a/spec/observables/combineLatest-spec.js b/spec/observables/combineLatest-spec.js index cb182209b3..456b6c4241 100644 --- a/spec/observables/combineLatest-spec.js +++ b/spec/observables/combineLatest-spec.js @@ -1,6 +1,7 @@ -/* globals describe, it, expect, expectObservable, hot */ +/* globals describe, it, expect, hot, cold, expectObservable */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; +var immediateScheduler = Rx.Scheduler.immediate; describe('Observable.combineLatest', function () { it('should combineLatest the provided observables', function () { @@ -14,4 +15,17 @@ describe('Observable.combineLatest', function () { expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'}); }); + + it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) { + var a = Observable.of(1, 2, 3, immediateScheduler); + var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler); + var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]]; + var i = 0; + Observable.combineLatest(a, b, immediateScheduler).subscribe(function (vals) { + expect(vals).toDeepEqual(r[i++]); + }, null, function() { + expect(i).toEqual(r.length); + done(); + }); + }); }); \ No newline at end of file diff --git a/spec/operators/combineAll-spec.js b/spec/operators/combineAll-spec.js new file mode 100644 index 0000000000..89a37bc8c4 --- /dev/null +++ b/spec/operators/combineAll-spec.js @@ -0,0 +1,31 @@ +/* globals describe, it, expect, hot, cold, expectObservable */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var immediateScheduler = Rx.Scheduler.immediate; + +describe('Observable.prototype.combineAll()', function(){ + it("should combine two observables", function (done) { + var a = Observable.of(1, 2, 3); + var b = Observable.of(4, 5, 6, 7, 8); + var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]]; + Observable.of(a, b).combineAll().subscribe(function (vals) { + expect(vals).toEqual(expected.shift()); + }, null, function() { + expect(expected.length).toBe(0); + done(); + }); + }); + + it("should combine two immediately-scheduled observables", function (done) { + var a = Observable.of(1, 2, 3, immediateScheduler); + var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler); + var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]]; + var i = 0; + Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) { + expect(vals).toDeepEqual(r[i++]); + }, null, function() { + expect(i).toEqual(r.length); + done(); + }); + }); +}); \ No newline at end of file diff --git a/spec/operators/combineLatest-spec.js b/spec/operators/combineLatest-spec.js index 923d63e220..bb21f075b7 100644 --- a/spec/operators/combineLatest-spec.js +++ b/spec/operators/combineLatest-spec.js @@ -4,54 +4,14 @@ var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; describe('Observable.prototype.combineLatest', function () { - it("should combine two observables", function (done) { - var a = Observable.of(1, 2, 3); - var b = Observable.of(4, 5, 6, 7, 8); - var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]]; - var i = 0; - Observable.of(a, b).combineAll().subscribe(function (vals) { - expect(vals).toDeepEqual(r[i++]); - }, null, function() { - expect(i).toEqual(r.length); - done(); - }); - }); - it("should combine a source with a second", function (done) { var a = Observable.of(1, 2, 3); var b = Observable.of(4, 5, 6, 7, 8); - var r = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]]; - var i = 0; - a.combineLatest(b).subscribe(function (vals) { - expect(vals).toDeepEqual(r[i++]); - }, null, function() { - expect(i).toEqual(r.length); - done(); - }); - }); - - it("should combine two immediately-scheduled observables", function (done) { - var a = Observable.of(1, 2, 3, immediateScheduler); - var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler); - var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]]; - var i = 0; - Observable.of(a, b, immediateScheduler).combineAll().subscribe(function (vals) { - expect(vals).toDeepEqual(r[i++]); - }, null, function() { - expect(i).toEqual(r.length); - done(); - }); - }); - - it("should combine an immediately-scheduled source with an immediately-scheduled second", function (done) { - var a = Observable.of(1, 2, 3, immediateScheduler); - var b = Observable.of(4, 5, 6, 7, 8, immediateScheduler); - var r = [[1, 4], [2, 4], [2, 5], [3, 5], [3, 6], [3, 7], [3, 8]]; - var i = 0; + var expected = [[3, 4], [3, 5], [3, 6], [3, 7], [3, 8]]; a.combineLatest(b).subscribe(function (vals) { - expect(vals).toDeepEqual(r[i++]); + expect(vals).toEqual(expected.shift()); }, null, function() { - expect(i).toEqual(r.length); + expect(expected.length).toEqual(0); done(); }); }); diff --git a/src/operators/combineLatest-static.ts b/src/operators/combineLatest-static.ts index d39e78e564..58d9b1a259 100644 --- a/src/operators/combineLatest-static.ts +++ b/src/operators/combineLatest-static.ts @@ -1,11 +1,18 @@ import Observable from '../Observable'; import ArrayObservable from '../observables/ArrayObservable'; import { CombineLatestOperator } from './combineLatest-support'; +import Scheduler from '../Scheduler'; -export default function combineLatest(...observables: (Observable | ((...values: Array) => R))[]): Observable { - const project = <((...ys: Array) => R)> observables[observables.length - 1]; - if (typeof project === "function") { - observables.pop(); +export default function combineLatest(...observables: (Observable | ((...values: Array) => R) | Scheduler)[]): Observable { + let project, scheduler; + + if(typeof (observables[observables.length - 1]).schedule === 'function') { + scheduler = observables.pop(); } - return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); + + if (typeof observables[observables.length - 1] === 'function') { + project = observables.pop(); + } + + return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project)); } \ No newline at end of file diff --git a/src/operators/combineLatest-support.ts b/src/operators/combineLatest-support.ts index 415e4583e3..7bea1ec941 100644 --- a/src/operators/combineLatest-support.ts +++ b/src/operators/combineLatest-support.ts @@ -5,27 +5,11 @@ import Subscriber from '../Subscriber'; import ArrayObservable from '../observables/ArrayObservable'; import EmptyObservable from '../observables/EmptyObservable'; -import {ZipSubscriber, ZipInnerSubscriber} from './zip-support'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; - -export function combineLatest(...observables: (Observable | ((...values: Array) => R))[]): Observable { - const project = <((...ys: Array) => R)> observables[observables.length - 1]; - if (typeof project === "function") { - observables.pop(); - } - return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); -} - -export function combineLatestProto(...observables: (Observable|((...values: any[]) => R))[]): Observable { - const project = <((...ys: Array) => R)> observables[observables.length - 1]; - if (typeof project === "function") { - observables.pop(); - } - observables.unshift(this); - return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); -} +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export class CombineLatestOperator implements Operator { @@ -40,51 +24,68 @@ export class CombineLatestOperator implements Operator { } } -export class CombineLatestSubscriber extends ZipSubscriber { - - project: (...values: Array) => R; - limit: number = 0; - - constructor(destination: Subscriber, project?: (...values: Array) => R) { - super(destination, project, []); +export class CombineLatestSubscriber extends OuterSubscriber { + private active: number = 0; + private values: any[] = []; + private observables: any[] = []; + private toRespond: number[] = []; + + constructor(destination: Subscriber, private project?: (...values: Array) => R) { + super(destination); } - _subscribeInner(observable, values, index, total) { - return observable._subscribe(new CombineLatestInnerSubscriber(this.destination, this, values, index, total)); + _next(observable: any) { + const toRespond = this.toRespond; + toRespond.push(toRespond.length); + this.observables.push(observable); } - - _innerComplete(innerSubscriber) { - if((this.active -= 1) === 0) { + + _complete() { + const observables = this.observables; + const len = observables.length; + if(len === 0) { this.destination.complete(); + } else { + this.active = len; + for(let i = 0; i < len; i++) { + let observable = observables[i]; + this.add(subscribeToResult(this, observable, observable, i)); + } } } -} - -export class CombineLatestInnerSubscriber extends ZipInnerSubscriber { - constructor(destination: Observer, parent: ZipSubscriber, values: any, index : number, total : number) { - super(destination, parent, values, index, total); + notifyComplete(innerSubscriber) { + if((this.active -= 1) === 0) { + this.destination.complete(); + } } - - _next(x) { - - const index = this.index; - const total = this.total; - const parent = this.parent; + + notifyNext(value: R, observable: any, innerIndex: number, outerIndex: number) { const values = this.values; - const valueBox = values[index]; - let limit; - - if(valueBox) { - valueBox[0] = x; - limit = parent.limit; - } else { - limit = parent.limit += 1; - values[index] = [x]; + values[outerIndex] = value; + const toRespond = this.toRespond; + + if(toRespond.length > 0) { + const found = toRespond.indexOf(outerIndex); + if(found !== -1) { + toRespond.splice(found, 1); + } } - - if(limit >= total) { - this._projectNext(values, parent.project); + + if(toRespond.length === 0) { + const project = this.project; + const destination = this.destination; + + if(project) { + let result = tryCatch(project).apply(this, values); + if(result === errorObject) { + destination.error(errorObject.e); + } else { + destination.next(result); + } + } else { + destination.next(values); + } } } } diff --git a/src/operators/combineLatest.ts b/src/operators/combineLatest.ts index ac26ad04a8..54c9d3ba0d 100644 --- a/src/operators/combineLatest.ts +++ b/src/operators/combineLatest.ts @@ -3,10 +3,10 @@ import ArrayObservable from '../observables/ArrayObservable'; import { CombineLatestOperator } from './combineLatest-support'; export default function combineLatest(...observables: (Observable|((...values: any[]) => R))[]): Observable { - const project = <((...ys: Array) => R)> observables[observables.length - 1]; - if (typeof project === "function") { - observables.pop(); - } observables.unshift(this); + let project; + if (typeof observables[observables.length - 1] === "function") { + project = observables.pop(); + } return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); } \ No newline at end of file diff --git a/src/operators/zip-support.ts b/src/operators/zip-support.ts index 4faa14e208..b93216a942 100644 --- a/src/operators/zip-support.ts +++ b/src/operators/zip-support.ts @@ -8,6 +8,8 @@ import ArrayObservable from '../observables/ArrayObservable'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export class ZipOperator implements Operator { @@ -22,14 +24,15 @@ export class ZipOperator implements Operator { } } -export class ZipSubscriber extends Subscriber { +export class ZipSubscriber extends OuterSubscriber { values: any; active: number = 0; observables: Observable[] = []; project: (...values: Array) => R; limit: number = Number.POSITIVE_INFINITY; - + buffers: any[][] = []; + constructor(destination: Subscriber, project?: (...values: Array) => R, values: any = Object.create(null)) { @@ -39,11 +42,11 @@ export class ZipSubscriber extends Subscriber { } _next(observable) { + this.buffers.push([]); this.observables.push(observable); } _complete() { - const values = this.values; const observables = this.observables; @@ -53,15 +56,44 @@ export class ZipSubscriber extends Subscriber { this.active = len; while(++index < len) { - this.add(this._subscribeInner(observables[index], values, index, len)); + const observable = observables[index]; + this.add(subscribeToResult(this, observable, observable, index)); } } - _subscribeInner(observable, values, index, total) { - return observable._subscribe(new ZipInnerSubscriber(this.destination, this, values, index, total)); + notifyNext(value: R, observable: T, index: number, observableIndex: number) { + const buffers = this.buffers; + buffers[observableIndex].push(value); + + const len = buffers.length; + for (let i = 0; i < len; i++) { + let buffer = buffers[i]; + if(buffer.length === 0) { + return; + } + } + + const outbound = []; + const destination = this.destination; + const project = this.project; + + for(let i = 0; i < len; i++) { + outbound.push(buffers[i].shift()); + } + + if(project) { + let result = tryCatch(project)(outbound); + if(result === errorObject){ + destination.error(errorObject.e); + } else { + destination.next(result); + } + } else { + destination.next(outbound); + } } - _innerComplete(innerSubscriber) { + notifyComplete(innerSubscriber) { if((this.active -= 1) === 0) { this.destination.complete(); } else { @@ -76,68 +108,4 @@ function arrayInitialize(length) { arr[i] = null; } return arr; -} - -export class ZipInnerSubscriber extends Subscriber { - - parent: ZipSubscriber; - values: any; - index: number; - total: number; - events: number = 0; - - constructor(destination: Observer, parent: ZipSubscriber, values: any, index : number, total : number) { - super(destination); - this.parent = parent; - this.values = values; - this.index = index; - this.total = total; - } - - _next(x) { - - const parent = this.parent; - const events = this.events; - const total = this.total; - const limit = parent.limit; - - if (events >= limit) { - this.destination.complete(); - return; - } - - const index = this.index; - const values = this.values; - const zipped = values[events] || (values[events] = arrayInitialize(total)); - - zipped[index] = [x]; - - if (zipped.every(hasValue)) { - this._projectNext(zipped, parent.project); - values[events] = undefined; - } - - this.events = events + 1; - } - - _projectNext(values: Array, project?: (...xs: Array) => R) { - if(project && typeof project === "function") { - const result = tryCatch(project).apply(null, values.map(mapValue)); - if(result === errorObject) { - this.destination.error(errorObject.e); - return; - } else { - this.destination.next(result); - } - } else { - this.destination.next(values.map(mapValue)); - } - } - - _complete() { - this.parent._innerComplete(this); - } -} - -export function mapValue(xs) { return xs[0]; } -export function hasValue(xs) { return xs && xs.length === 1; } +} \ No newline at end of file