diff --git a/spec/helpers/test-helper.js b/spec/helpers/test-helper.js index 9d99c41bc4..6ff5f95f43 100644 --- a/spec/helpers/test-helper.js +++ b/spec/helpers/test-helper.js @@ -42,9 +42,6 @@ global.it = function (description, cb, timeout) { global.rxTestScheduler.flush(); }); } else { - if (description === 'should work with never and empty') { - console.log("TEST"); - } glit.apply(this, arguments); } }; diff --git a/spec/observables/merge-spec.js b/spec/observables/merge-spec.js new file mode 100644 index 0000000000..688fa0a9af --- /dev/null +++ b/spec/observables/merge-spec.js @@ -0,0 +1,102 @@ +/* globals expect, it, describe, hot, cold, expectObservable */ + +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.merge(...observables)', function() { + it('should merge cold and cold', function() { + var e1 = cold('---a-----b-----c----|'); + var e2 = cold('------x-----y-----z----|'); + var expected = '---a--x--b--y--c--z----|'; + expectObservable(Observable.merge(e1, e2)).toBe(expected); + }); + + it('should merge hot and hot', function() { + var e1 = hot('---a---^-b-----c----|'); + var e2 = hot('-----x-^----y-----z----|'); + var expected = '--b--y--c--z----|'; + expectObservable(Observable.merge(e1, e2)).toBe(expected); + }); + + it('should merge hot and cold', function(){ + var e1 = hot('---a-^---b-----c----|'); + var e2 = cold( '--x-----y-----z----|'); + var expected = '--x-b---y-c---z----|'; + expectObservable(Observable.merge(e1, e2)).toBe(expected); + }); + + it('should merge parallel emissions', function(){ + var e1 = hot('---a----b----c----|'); + var e2 = hot('---x----y----z----|'); + var expected = '---(ax)-(by)-(cz)-|'; + expectObservable(Observable.merge(e1, e2)).toBe(expected); + }); + + it('should merge empty and empty', function(){ + var e1 = Observable.empty(); + var e2 = Observable.empty(); + expectObservable(Observable.merge(e1, e2)).toBe('|'); + }); + + it('should merge never and empty', function(){ + var e1 = Observable.never(); + var e2 = Observable.empty(); + expectObservable(Observable.merge(e1, e2)).toBe('-'); + }); + + it('should merge never and never', function(){ + var e1 = Observable.never(); + var e2 = Observable.never(); + expectObservable(Observable.merge(e1, e2)).toBe('-'); + }); + + it('should merge empty and throw', function(){ + var e1 = Observable.empty(); + var e2 = Observable.throw(new Error('blah')); + expectObservable(Observable.merge(e1, e2)).toBe('#', undefined, new Error('blah')); + }); + + it('should merge hot and throw', function(){ + var e1 = hot('--a--b--c--|'); + var e2 = Observable.throw(new Error('blah')); + expectObservable(Observable.merge(e1, e2)).toBe('#', undefined, new Error('blah')); + }); + + it('should merge never and throw', function(){ + var e1 = Observable.never(); + var e2 = Observable.throw(new Error('blah')); + expectObservable(Observable.merge(e1, e2)).toBe('#', undefined, new Error('blah')); + }); + + + it('should merge empty and error', function(){ + var e1 = Observable.empty(); + var e2 = hot('-------#', undefined, new Error('blah')); + var expected = '-------#'; + expectObservable(Observable.merge(e1, e2)).toBe(expected, undefined, new Error('blah')); + }); + + it('should merge hot and error', function(){ + var e1 = hot('--a--b--c--|'); + var e2 = hot('-------#', undefined, new Error('blah')); + var expected = '--a--b-#'; + expectObservable(Observable.merge(e1, e2)).toBe(expected, undefined, new Error('blah')); + }); + + it('should merge never and error', function(){ + var e1 = Observable.never(); + var e2 = hot('-------#', undefined, new Error('blah')); + var expected = '-------#'; + expectObservable(Observable.merge(e1, e2)).toBe(expected, undefined, new Error('blah')); + }); +}); + +describe('Observable.merge(number, ...observables)', function(){ + it('should handle concurrency limits', function () { + var e1 = cold('---a---b---c---|'); + var e2 = cold('-d---e---f--|'); + var e3 = cold( '---x---y---z---|'); + var expected = '-d-a-e-b-f-c---x---y---z---|'; + expectObservable(Observable.merge(e1, e2, e3, 2)).toBe(expected); + }); +}); \ No newline at end of file diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js index ef8d7ef83c..6ca103432d 100644 --- a/spec/operators/expand-spec.js +++ b/spec/operators/expand-spec.js @@ -1,29 +1,57 @@ +/* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.prototype.expand()', function () { - it('should map and recursively flatten', function (done) { - var expected = [1, 2, 3, 4, 5]; - Observable.of(0).expand(function (x) { - if (x > 4) { + it('should map and recursively flatten', function() { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + } + var e1 = hot('a', values); + /* + expectation explanation: (conjunction junction?) ... + + since `cold('---(z|)')` emits `x + x` and completes on frame 30 + but the next "expanded" return value is synchronously subscribed to in + that same frame, it stacks like so: + + a + ---(b|) + ---(c|) + ---(d|) + ---(e|) (...which flattens into:) + a--b--c--d--(e|) + */ + var expected = 'a--b--c--d--(e|)'; + + expectObservable(e1.expand(function(x) { + if(x === 16) { return Observable.empty(); } - return Observable.of(x + 1); - }) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + return cold('---(z|)', { z: x + x }); + })).toBe(expected, values); }); - it('should map and recursively flatten with ScalarObservables', function (done) { - var expected = [1, 2, 3, 4, 5]; - Observable.of(0).expand(function (x) { - if (x > 4) { + + it('should map and recursively flatten with scalars', function() { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + } + var e1 = hot('a', values); + var expected = '(abcde|)'; + + expectObservable(e1.expand(function(x) { + if(x === 16) { return Observable.empty(); } - return Observable.of(x + 1); - }) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + return Observable.of(x + x); // scalar + })).toBe(expected, values); }); }); \ No newline at end of file diff --git a/spec/operators/merge-all-spec.js b/spec/operators/merge-all-spec.js index 16e808a57f..0861e92747 100644 --- a/spec/operators/merge-all-spec.js +++ b/spec/operators/merge-all-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; @@ -33,4 +33,12 @@ describe('mergeAll', function () { done(); }); }); + + it('should handle merging a hot observable of observables', function (){ + var x = cold( 'a---b---c---|'); + var y = cold( 'd---e---f---|'); + var e1 = hot('--x--y--|', { x: x, y: y }); + var expected = '--a--db--ec--f---|'; + expectObservable(e1.mergeAll()).toBe(expected); + }); }); \ No newline at end of file diff --git a/spec/operators/merge-spec.js b/spec/operators/merge-spec.js index dfba3d7893..89016ceebb 100644 --- a/spec/operators/merge-spec.js +++ b/spec/operators/merge-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, hot, cold, expectObservable, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; @@ -23,6 +23,14 @@ describe("Observable.prototype.merge", function () { expect(val).toBe(r[i++]); }, null, done); }); + + + it('should handle merging two hot observables', function (){ + var e1 = hot('--a-----b-----c----|'); + var e2 = hot('-----d-----e-----f---|'); + var expected = '--a--d--b--e--c--f---|'; + expectObservable(e1.merge(e2, rxTestScheduler)).toBe(expected); + }); }); describe('Observable.prototype.mergeAll', function () { diff --git a/spec/operators/switchAll-spec.js b/spec/operators/switchAll-spec.js index 08d79528fd..9731cc6c7c 100644 --- a/spec/operators/switchAll-spec.js +++ b/spec/operators/switchAll-spec.js @@ -1,4 +1,4 @@ -/* expect, it, describe */ +/* expect, it, describe, expectObserable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; @@ -26,4 +26,12 @@ describe('Observable.prototype.switchAll()', function(){ expect(x).toBe(r[i++]); }, null, done); }); + + it('should handle a hot observable of observables', function() { + var x = cold( '--a---b---c--|'); + var y = cold( '---d--e---f---|'); + var e1 = hot( '------x-------y------|', { x: x, y: y }); + var expected = '--------a---b----d--e---f---|'; + expectObservable(e1.switchAll()).toBe(expected); + }); }); \ No newline at end of file diff --git a/src/operators/concat-static.ts b/src/operators/concat-static.ts index 03d3e2db93..a341ce6d1f 100644 --- a/src/operators/concat-static.ts +++ b/src/operators/concat-static.ts @@ -3,12 +3,13 @@ import Observable from '../Observable'; import Scheduler from '../Scheduler'; import immediate from '../schedulers/immediate'; -export default function concat(...observables: any[]) : Observable { - let scheduler = immediate; - const len = observables.length; - if(typeof observables[observables.length - 1].schedule === 'function') { - scheduler = observables.pop(); - observables.push(1, scheduler); +export default function concat(...observables: (Observable|Scheduler)[]) : Observable { + let scheduler:Scheduler = immediate; + let args = observables; + const len = args.length; + if(typeof (args[observables.length - 1]).schedule === 'function') { + scheduler = args.pop(); + args.push(1, scheduler); } return merge.apply(this, observables); } \ No newline at end of file diff --git a/src/operators/concat.ts b/src/operators/concat.ts index a3cef3bd8b..e7e16e68d2 100644 --- a/src/operators/concat.ts +++ b/src/operators/concat.ts @@ -1,8 +1,12 @@ import merge from './merge-static'; import Observable from '../Observable'; +import Scheduler from '../Scheduler'; -export default function concatProto(...observables:any[]) : Observable { - observables.unshift(this); - observables.push(1); - return merge.apply(this, observables); +export default function concatProto(...observables:(Observable|Scheduler)[]) : Observable { + var args = observables; + args.unshift(this); + if(args.length > 1 && typeof args[args.length - 1].schedule === 'function') { + args.splice(args.length - 2, 0, 1); + } + return merge.apply(this, args); } \ No newline at end of file diff --git a/src/operators/concatAll.ts b/src/operators/concatAll.ts index 342c4883c1..ac57d20310 100644 --- a/src/operators/concatAll.ts +++ b/src/operators/concatAll.ts @@ -1,5 +1,5 @@ -import { MergeOperator } from './merge-support'; +import { MergeAllOperator } from './mergeAll-support'; export default function concatAll() { - return this.lift(new MergeOperator(1)); + return this.lift(new MergeAllOperator(1)); } diff --git a/src/operators/concatMapTo.ts b/src/operators/concatMapTo.ts index 77178e9fcf..c8df08a610 100644 --- a/src/operators/concatMapTo.ts +++ b/src/operators/concatMapTo.ts @@ -1,7 +1,7 @@ import Observable from '../Observable'; import { FlatMapToOperator } from './flatMapTo-support'; -export default function concatMapTo(observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) : Observable { +export default function concatMapTo(observable: Observable, + projectResult?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) : Observable { return this.lift(new FlatMapToOperator(observable, projectResult, 1)); -} +} \ No newline at end of file diff --git a/src/operators/expand.ts b/src/operators/expand.ts index b93fe997b1..ce7dfffa7f 100644 --- a/src/operators/expand.ts +++ b/src/operators/expand.ts @@ -2,69 +2,94 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; +import Subscription from '../Subscription'; -import { MergeSubscriber, MergeInnerSubscriber } from './merge-support'; +// import { MergeAllSubscriber, MergeAllInnerSubscriber } from './mergeAll-support'; import EmptyObservable from '../observables/EmptyObservable'; import ScalarObservable from '../observables/ScalarObservable'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; -export default function expand(project: (x: T, ix: number) => Observable): Observable { - return this.lift(new ExpandOperator(project)); +export default function expand(project: (value: T, index: number) => Observable, + concurrent: number = Number.POSITIVE_INFINITY): Observable { + return this.lift(new ExpandOperator(project, concurrent)); } class ExpandOperator implements Operator { - - project: (x: T, ix: number) => Observable; - - constructor(project: (x: T, ix: number) => Observable) { - this.project = project; + constructor(private project: (value: T, index: number) => Observable, + private concurrent: number = Number.POSITIVE_INFINITY) { } call(subscriber: Subscriber): Subscriber { - return new ExpandSubscriber(subscriber, this.project); + return new ExpandSubscriber(subscriber, this.project, this.concurrent); } } -class ExpandSubscriber extends MergeSubscriber { - - project: (x: T, ix: number) => Observable; - - constructor(destination: Observer, - project: (x: T, ix: number) => Observable) { - super(destination, Number.POSITIVE_INFINITY); - this.project = project; - } - - _project(value, index) { - const observable = tryCatch(this.project).call(this, value, index); - if (observable === errorObject) { - this.error(errorObject.e); - return null; +class ExpandSubscriber extends Subscriber { + private index: number = 0; + private active: number = 0; + private hasCompleted: boolean = true; + private buffer: T[]; + + constructor(destination: Observer, private project: (value: T, index: number) => Observable, + private concurrent: number = Number.POSITIVE_INFINITY) { + super(destination); + if(concurrent < Number.POSITIVE_INFINITY) { + this.buffer = []; } - return observable; } - - _subscribeInner(observable, value, index) { - if(observable._isScalar) { - this.destination.next((> observable).value); - this._innerComplete(); - this._next((observable).value); - } else if(observable instanceof EmptyObservable) { - this._innerComplete(); + + _next(value: T) { + const index = this.index++; + this.destination.next(value); + if(this.active < this.concurrent) { + let result = tryCatch(this.project)(value, index); + if(result === errorObject) { + this.destination.error(result.e); + } else { + if(result._isScalar) { + this._next(result.value); + } else { + this.active++; + this.add(result.subscribe(new ExpandInnerSubscriber(this.destination, this))); + } + } } else { - return observable._subscribe(new ExpandInnerSubscriber(this.destination, this)); + this.buffer.push(value); + } + } + + _complete() { + this.hasCompleted = true; + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer && buffer.length > 0) { + this._next(buffer.shift()); + } + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); } } } -class ExpandInnerSubscriber extends MergeInnerSubscriber { - constructor(destination: Observer, parent: ExpandSubscriber) { - super(destination, parent); +class ExpandInnerSubscriber extends Subscriber { + constructor(destination: Observer, private parent: ExpandSubscriber) { + super(destination); } + _next(value) { - this.destination.next(value); - this.parent.next(value); + this.parent._next(value); + } + + _complete() { + this.parent.notifyComplete(this); } } diff --git a/src/operators/flatMap-support.ts b/src/operators/flatMap-support.ts index df1472132c..628d27b603 100644 --- a/src/operators/flatMap-support.ts +++ b/src/operators/flatMap-support.ts @@ -1,95 +1,120 @@ -import Operator from '../Operator'; -import Observer from '../Observer'; import Observable from '../Observable'; +import Operator from '../Operator'; import Subscriber from '../Subscriber'; - -import { MergeSubscriber, MergeInnerSubscriber } from './merge-support'; -import ScalarObservable from '../observables/ScalarObservable'; - +import Subscription from '../Subscription'; +import Observer from '../Observer'; import tryCatch from '../util/tryCatch'; import { errorObject } from '../util/errorObject'; -export class FlatMapOperator implements Operator { - - project: (x: T, ix: number) => Observable; - projectResult: (x: T, y: any, ix: number, iy: number) => R; - concurrent: number; +export default function flatMap(project: (value: T, index: number) => Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R, + concurrent: number = Number.POSITIVE_INFINITY) { + return this.lift(new FlatMapOperator(project, resultSelector, concurrent)); +} - constructor(project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R, - concurrent: number = Number.POSITIVE_INFINITY) { - this.project = project; - this.projectResult = projectResult; - this.concurrent = concurrent; +export class FlatMapOperator implements Operator { + constructor(private project: (value: T, index: number) => Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + private concurrent: number = Number.POSITIVE_INFINITY) { } - - call(subscriber: Subscriber): Subscriber { - return new FlatMapSubscriber(subscriber, this.concurrent, this.project, this.projectResult); + + call(observer: Subscriber): Subscriber { + return new FlatMapSubscriber(observer, this.project, this.resultSelector, this.concurrent); } } -export class FlatMapSubscriber extends MergeSubscriber { - - project: (x: T, ix: number) => Observable; - projectResult: (x: T, y: any, ix: number, iy: number) => R; - - constructor(destination: Observer, - concurrent: number, - project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(destination, concurrent); - this.project = project; - this.projectResult = projectResult; +export class FlatMapSubscriber extends Subscriber { + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + protected index: number = 0; + + constructor(destination: Observer, + private project: (value: T, index: number) => Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + private concurrent: number = Number.POSITIVE_INFINITY) { + super(destination); } - - _project(value, index) { - const observable = tryCatch(this.project).call(this, value, index); - if (observable === errorObject) { - this.error(errorObject.e); - return null; + + _next(value: any) { + if(this.active < this.concurrent) { + const resultSelector = this.resultSelector; + const index = this.index++; + const observable = tryCatch(this.project)(value, index); + if(observable === errorObject) { + this.destination.error(observable.e); + } else { + this._innerSubscribe(observable, value, index); + } + } else { + this.buffer.push(value); } - return observable; } - - _subscribeInner(observable:Observable, value, index) { - const projectResult = this.projectResult; - if(projectResult) { - return observable._subscribe(new FlatMapInnerSubscriber(this.destination, this, value, index, projectResult)); - } else if(observable._isScalar) { - this.destination.next(( observable).value); - this._innerComplete(); + + _innerSubscribe(observable, value, index) { + const resultSelector = this.resultSelector; + if(observable._isScalar) { + if(resultSelector) { + let result = tryCatch(resultSelector)(observable.value, value, 0, index); + if(result === errorObject) { + this.destination.error(result.e); + } else { + this.destination.next(result); + } + } else { + this.destination.next(observable.value); + } } else { - return observable._subscribe(new MergeInnerSubscriber(this.destination, this)); + this.active++; + this.add(observable.subscribe(new FlatMapInnerSubscriber(this.destination, this, value, index, resultSelector))); + } + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0 && this.buffer.length === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + this.destination.complete(); } } } -export class FlatMapInnerSubscriber extends MergeInnerSubscriber { - - value: any; - index: number; - project: (x: T, y: any, ix: number, iy: number) => R; - count: number = 0; - - constructor(destination: Observer, - parent: FlatMapSubscriber, - value: any, - index: number, - project?: (x: T, y: any, ix: number, iy: number) => R) { - super(destination, parent); - this.value = value; - this.index = index; - this.project = project; +export class FlatMapInnerSubscriber extends Subscriber { + index: number = 0; + + constructor(destination: Observer, private parent: FlatMapSubscriber, + private outerValue: T, + private outerIndex: number, + private resultSelector?: (innerValue: T, outerValue: R, innerIndex: number, outerIndex: number) => R2) { + super(destination); } - - _next(value) { - let result = value; - const index = this.count++; - result = tryCatch(this.project).call(this, this.value, value, this.index, index); - if (result === errorObject) { - this.destination.error(errorObject.e); + + _next(value: R) { + const resultSelector = this.resultSelector; + const index = this.index++; + if(resultSelector) { + let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex); + if(result === errorObject) { + this.destination.error(result.e); + } else { + this.destination.next(result); + } } else { - this.destination.next(result); + this.destination.next(value); } } -} + + _complete() { + this.parent.notifyComplete(this); + } +} \ No newline at end of file diff --git a/src/operators/flatMap.ts b/src/operators/flatMap.ts index f3c0698bed..951a0a2a65 100644 --- a/src/operators/flatMap.ts +++ b/src/operators/flatMap.ts @@ -1,8 +1,8 @@ import Observable from '../Observable'; import { FlatMapOperator } from './flatMap-support'; -export default function flatMap(project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R, - concurrent?: number) { - return this.lift(new FlatMapOperator(project, projectResult, concurrent)); +export default function flatMap(project: (value: T, index: number) => Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R, + concurrent: number = Number.POSITIVE_INFINITY) { + return this.lift(new FlatMapOperator(project, resultSelector, concurrent)); } \ No newline at end of file diff --git a/src/operators/flatMapTo-support.ts b/src/operators/flatMapTo-support.ts index ab5b5b630c..3170aa0900 100644 --- a/src/operators/flatMapTo-support.ts +++ b/src/operators/flatMapTo-support.ts @@ -2,42 +2,30 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; - import { FlatMapSubscriber } from './flatMap-support'; -export class FlatMapToOperator implements Operator { - - observable: Observable; - projectResult: (x: T, y: any, ix: number, iy: number) => R - concurrent: number; - - constructor(observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R, - concurrent: number = Number.POSITIVE_INFINITY) { - this.observable = observable; - this.projectResult = projectResult; - this.concurrent = concurrent; - } - - call(subscriber: Subscriber): Subscriber { - return new FlatMapToSubscriber(subscriber, this.concurrent, this.observable, this.projectResult); +export class FlatMapToOperator implements Operator { + constructor(private observable: Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + private concurrent: number = Number.POSITIVE_INFINITY) { + + } + + call(observer: Subscriber): Subscriber { + return new FlatMapToSubscriber(observer, this.observable, this.resultSelector, this.concurrent); } } -export class FlatMapToSubscriber extends FlatMapSubscriber { - - observable: Observable; - - constructor(destination: Observer, - concurrent: number, - observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(destination, concurrent, null, projectResult); - this.observable = observable; - } - - _project(value, index) { - return this.observable; +export class FlatMapToSubscriber extends FlatMapSubscriber { + constructor(destination: Observer, private observable: Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + concurrent: number = Number.POSITIVE_INFINITY) { + super(destination, null, resultSelector, concurrent); + } + + _next(value: T) { + const observable = this.observable; + const index = this.index++; + super._innerSubscribe(observable, value, index); } -} - +} \ No newline at end of file diff --git a/src/operators/flatMapTo.ts b/src/operators/flatMapTo.ts index e40b5bedf0..9472f17ea8 100644 --- a/src/operators/flatMapTo.ts +++ b/src/operators/flatMapTo.ts @@ -1,12 +1,8 @@ -import Operator from '../Operator'; -import Observer from '../Observer'; import Observable from '../Observable'; -import Subscriber from '../Subscriber'; - import { FlatMapToOperator } from './flatMapTo-support'; -export default function flatMapTo(observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R, - concurrent?: number) { - return this.lift(new FlatMapToOperator(observable, projectResult, concurrent)); +export default function flatMapTo(observable: Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + concurrent: number = Number.POSITIVE_INFINITY) : Observable { + return this.lift(new FlatMapToOperator(observable, resultSelector, concurrent)); } \ No newline at end of file diff --git a/src/operators/merge-static.ts b/src/operators/merge-static.ts index 40c12da440..d4f6ff9202 100644 --- a/src/operators/merge-static.ts +++ b/src/operators/merge-static.ts @@ -1,7 +1,7 @@ import Scheduler from '../Scheduler'; import Observable from '../Observable'; import ArrayObservable from '../observables/ArrayObservable'; -import { MergeOperator } from './merge-support'; +import { MergeAllOperator } from './mergeAll-support'; import immediate from '../schedulers/immediate'; export default function merge(...observables: (Observable|Scheduler|number)[]): Observable { @@ -20,6 +20,6 @@ export default function merge(...observables: (Observable|Scheduler|numb if(observables.length === 1) { return >observables[0]; } - - return new ArrayObservable(observables, scheduler).lift(new MergeOperator(concurrent)); + + return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator(concurrent)); } \ No newline at end of file diff --git a/src/operators/merge-support.ts b/src/operators/merge-support.ts deleted file mode 100644 index 03ee4e8756..0000000000 --- a/src/operators/merge-support.ts +++ /dev/null @@ -1,109 +0,0 @@ -import Operator from '../Operator'; -import Subscriber from '../Subscriber'; -import Observer from '../Observer'; -import Observable from '../Observable'; -import ScalarObservable from '../observables/ScalarObservable'; -import Subscription from '../Subscription'; - -export class MergeOperator implements Operator { - - concurrent: number; - - constructor(concurrent: number = Number.POSITIVE_INFINITY) { - this.concurrent = concurrent; - } - - call(subscriber: Subscriber): Subscriber { - return new MergeSubscriber(subscriber, this.concurrent); - } -} - -export class MergeSubscriber extends Subscriber { - - count: number = 0; - active: number = 0; - stopped: boolean = false; - buffer: Observable[] = []; - concurrent: number; - - constructor(destination: Observer, concurrent: number) { - super(destination); - this.concurrent = concurrent; - } - - _next(value) { - const active = this.active; - if (active < this.concurrent) { - - const index = this.count; - const observable = this._project(value, index); - - if (observable) { - this.count = index + 1; - this.active = active + 1; - this.add(this._subscribeInner(observable, value, index)); - } - } else { - this._buffer(value); - } - } - - complete() { - this.stopped = true; - if (this.active === 0 && this.buffer.length === 0) { - super.complete(); - } - } - - _unsubscribe() { - this.buffer = void 0; - } - - _project(value, index) { - return value; - } - - _buffer(value) { - this.buffer.push(value); - } - - _subscribeInner(observable:Observable, value, index): Subscription { - const destination = this.destination; - if(observable._isScalar) { - destination.next((observable).value); - this._innerComplete(); - } else { - const subscriber = new MergeInnerSubscriber(destination, this); - observable._subscribe(subscriber); - return subscriber; - } - } - - _innerComplete() { - - const buffer = this.buffer; - const active = this.active -= 1; - const stopped = this.stopped; - const pending = buffer.length; - - if (stopped && active === 0 && pending === 0) { - super.complete(); - } else if (active < this.concurrent && pending > 0) { - this._next(buffer.shift()); - } - } -} - -export class MergeInnerSubscriber extends Subscriber { - - parent: MergeSubscriber; - - constructor(destination: Observer, parent: MergeSubscriber) { - super(destination); - this.parent = parent; - } - - _complete() { - this.parent._innerComplete(); - } -} diff --git a/src/operators/merge.ts b/src/operators/merge.ts index ef7f77f62e..a598765da4 100644 --- a/src/operators/merge.ts +++ b/src/operators/merge.ts @@ -1,7 +1,8 @@ import Observable from '../Observable'; import mergeStatic from './merge-static'; +import Scheduler from '../Scheduler'; -export default function merge(...observables: (Observable|number)[]): Observable { +export default function merge(...observables: (Observable|Scheduler|number)[]): Observable { observables.unshift(this); return mergeStatic.apply(this, observables); } \ No newline at end of file diff --git a/src/operators/mergeAll-support.ts b/src/operators/mergeAll-support.ts new file mode 100644 index 0000000000..71d2111f4e --- /dev/null +++ b/src/operators/mergeAll-support.ts @@ -0,0 +1,65 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; +import Subscription from '../Subscription'; + +export class MergeAllOperator implements Operator { + constructor(private concurrent: number) { + + } + + call(observer: Observer) { + return new MergeAllSubscriber(observer, this.concurrent); + } +} + +export class MergeAllSubscriber extends Subscriber { + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + constructor(destination: Observer, private concurrent:number) { + super(destination); + } + + _next(observable: any) { + if(this.active < this.concurrent) { + if(observable._isScalar) { + this.destination.next(observable.value); + } else { + this.active++; + this.add(observable.subscribe(new MergeAllInnerSubscriber(this.destination, this))) + } + } else { + this.buffer.push(observable); + } + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0 && this.buffer.length === 0) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if(buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + this.destination.complete(); + } + } +} + +export class MergeAllInnerSubscriber extends Subscriber { + constructor(destination: Observer, private parent: MergeAllSubscriber) { + super(destination); + } + + _complete() { + this.parent.notifyComplete(this); + } +} \ No newline at end of file diff --git a/src/operators/mergeAll.ts b/src/operators/mergeAll.ts index de6ba0fafd..79290c8cb2 100644 --- a/src/operators/mergeAll.ts +++ b/src/operators/mergeAll.ts @@ -1,6 +1,10 @@ import Observable from '../Observable'; -import { MergeOperator } from './merge-support'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; +import Subscription from '../Subscription'; +import { MergeAllOperator } from './mergeAll-support'; -export default function mergeAll(concurrent?: any): Observable { - return this.lift(new MergeOperator(concurrent)); -} +export default function mergeAll(concurrent: number = Number.POSITIVE_INFINITY): Observable { + return this.lift(new MergeAllOperator(concurrent)); +} \ No newline at end of file diff --git a/src/operators/switchAll.ts b/src/operators/switchAll.ts index 98044e863f..f60a965c25 100644 --- a/src/operators/switchAll.ts +++ b/src/operators/switchAll.ts @@ -3,7 +3,6 @@ import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; -import { MergeSubscriber } from './merge-support'; export default function switchAll(): Observable { return this.lift(new SwitchOperator()); @@ -19,31 +18,55 @@ class SwitchOperator implements Operator { } } -class SwitchSubscriber extends MergeSubscriber { - +class SwitchSubscriber extends Subscriber { + private active: number = 0; + private hasCompleted: boolean = false; innerSubscription: Subscription; - constructor(destination: Observer) { - super(destination, 1); + constructor(destination: Observer) { + super(destination); } - - _buffer(value) { - const active = this.active; - if(active > 0) { - this.active = active - 1; - const inner = this.innerSubscription; - if(inner) { - inner.unsubscribe() - this.innerSubscription = null; - } + + _next(value: any) { + this.active++; + this.unsubscribeInner(); + this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this))); + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0) { + this.destination.complete(); + } + } + + unsubscribeInner() { + const innerSubscription = this.innerSubscription; + if(innerSubscription) { + this.active--; + innerSubscription.unsubscribe(); + this.remove(innerSubscription); } - this._next(value); } + + notifyComplete() { + this.unsubscribeInner(); + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } +} - _subscribeInner(observable, value, index) { - this.innerSubscription = new Subscription(); - this.innerSubscription.add(super._subscribeInner(observable, value, index)); - return this.innerSubscription; +class InnerSwitchSubscriber extends Subscriber { + constructor(destination: Observer, private parent: SwitchSubscriber) { + super(destination); + } + + _next(value: T) { + super._next(value); + } + _complete() { + this.parent.notifyComplete(); } } diff --git a/src/operators/switchLatest.ts b/src/operators/switchLatest.ts index 390c4e567f..4e2b2d3378 100644 --- a/src/operators/switchLatest.ts +++ b/src/operators/switchLatest.ts @@ -4,50 +4,30 @@ import Observable from '../Observable'; import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; -import { FlatMapOperator, FlatMapSubscriber } from './flatMap-support'; +import { FlatMapSubscriber } from './flatMap-support'; -export default function switchLatest(project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R): Observable{ - return this.lift(new SwitchLatestOperator(project, projectResult)); +export default function switchLatest(project: (value: T, index: number) => Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2): Observable{ + return this.lift(new SwitchLatestOperator(project, resultSelector)); } -class SwitchLatestOperator extends FlatMapOperator { - - constructor(project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(project, projectResult, 1); +class SwitchLatestOperator implements Operator { + constructor(private project: (value: T, index: number) => Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { } call(subscriber: Subscriber): Subscriber { - return new SwitchLatestSubscriber(subscriber, this.project, this.projectResult); + return new SwitchLatestSubscriber(subscriber, this.project, this.resultSelector); } } -class SwitchLatestSubscriber extends FlatMapSubscriber { +class SwitchLatestSubscriber extends FlatMapSubscriber { innerSubscription: Subscription; - constructor(destination: Observer, - project: (x: T, ix: number) => Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(destination, 1, project, projectResult); - } - - _buffer(value) { - const active = this.active; - if(active > 0) { - this.active = active - 1; - const inner = this.innerSubscription; - if(inner) { - inner.unsubscribe() - } - } - this._next(value); - } - - _subscribeInner(observable, value, index) { - this.innerSubscription = new Subscription(); - this.innerSubscription.add(super._subscribeInner(observable, value, index)); - return this.innerSubscription; + constructor(destination: Observer, + project: (value: T, index: number) => Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { + super(destination, project, resultSelector, 1); } } diff --git a/src/operators/switchLatestTo.ts b/src/operators/switchLatestTo.ts index 6ebd0bd085..d831b9f8fa 100644 --- a/src/operators/switchLatestTo.ts +++ b/src/operators/switchLatestTo.ts @@ -4,48 +4,30 @@ import Observable from '../Observable'; import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; -import { FlatMapToOperator, FlatMapToSubscriber } from './flatMapTo-support'; +import { FlatMapToSubscriber } from './flatMapTo-support'; -export default function switchLatestTo(observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R): Observable { +export default function switchLatestTo(observable: Observable, + projectResult?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2): Observable { return this.lift(new SwitchLatestToOperator(observable, projectResult)); } -class SwitchLatestToOperator extends FlatMapToOperator { - - constructor(observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(observable, projectResult, 1); +class SwitchLatestToOperator implements Operator { + constructor(private observable: Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { } call(subscriber: Subscriber): Subscriber { - return new SwitchLatestToSubscriber(subscriber, this.observable, this.projectResult); + return new SwitchLatestToSubscriber(subscriber, this.observable, this.resultSelector); } } -class SwitchLatestToSubscriber extends FlatMapToSubscriber { +class SwitchLatestToSubscriber extends FlatMapToSubscriber { innerSubscription: Subscription; - constructor(destination: Observer, - observable: Observable, - projectResult?: (x: T, y: any, ix: number, iy: number) => R) { - super(destination, 1, observable, projectResult); - } - - _buffer(value) { - const active = this.active; - if(active > 0) { - this.active = active - 1; - const inner = this.innerSubscription; - if(inner) { - inner.unsubscribe() - } - } - this._next(value); - } - - _subscribeInner(observable, value, index) { - return (this.innerSubscription = super._subscribeInner(observable, value, index)); + constructor(destination: Observer, + observable: Observable, + resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { + super(destination, observable, resultSelector, 1); } } diff --git a/src/schedulers/TestScheduler.ts b/src/schedulers/TestScheduler.ts index 1db584b184..ec618ab440 100644 --- a/src/schedulers/TestScheduler.ts +++ b/src/schedulers/TestScheduler.ts @@ -4,6 +4,8 @@ import Notification from '../Notification'; import Subject from '../Subject'; export default class TestScheduler extends VirtualTimeScheduler { + private hotObservables: { setup: (scheduler: TestScheduler) => void, subject: Subject }[] = []; + constructor(public assertDeepEqual: (actual: any, expected: any) => boolean | void) { super(); } @@ -15,21 +17,26 @@ export default class TestScheduler extends VirtualTimeScheduler { let messages = TestScheduler.parseMarbles(marbles, values, error); return Observable.create(subscriber => { messages.forEach(({ notification, frame }) => { - this.schedule(() => { + subscriber.add(this.schedule(() => { notification.observe(subscriber); - }, frame); + }, frame)); }, this); }); } - createHotObservable(marbles: string, values?: any, error?: any) { + createHotObservable(marbles: string, values?: any, error?: any): Subject { let messages = TestScheduler.parseMarbles(marbles, values, error); let subject = new Subject(); - messages.forEach(({ notification, frame }) => { - this.schedule(() => { - notification.observe(subject); - }, frame); - }, this); + this.hotObservables.push({ + setup(scheduler) { + messages.forEach(({ notification, frame }) => { + scheduler.schedule(() => { + notification.observe(subject); + }, frame); + }); + }, + subject + }); return subject; } @@ -62,11 +69,17 @@ export default class TestScheduler extends VirtualTimeScheduler { }; } - flush() { + flush() { + const hotObservables = this.hotObservables; + while(hotObservables.length > 0) { + hotObservables.shift().setup(this); + } + super.flush(); const flushTests = this.flushTests.filter(test => test.ready); while (flushTests.length > 0) { var test = flushTests.shift(); + test.actual.sort((a, b) => a.frame === b.frame ? 0 : (a.frame > b.frame ? 1 : -1)); this.assertDeepEqual(test.actual, test.expected); } } diff --git a/src/schedulers/VirtualTimeScheduler.ts b/src/schedulers/VirtualTimeScheduler.ts index 153182b98f..bdc2ad37cd 100644 --- a/src/schedulers/VirtualTimeScheduler.ts +++ b/src/schedulers/VirtualTimeScheduler.ts @@ -14,17 +14,7 @@ export default class VirtualTimeScheduler implements Scheduler { return 0; } - sortActions() { - if (!this.sorted) { - ([]>this.actions).sort((a, b) => { - return a.delay === b.delay ? (a.index > b.index ? 1 : -1) : (a.delay > b.delay ? 1 : -1); - }); - this.sorted = true; - } - } - flush() { - this.sortActions(); const actions = this.actions; while (actions.length > 0) { let action = actions.shift(); @@ -33,6 +23,20 @@ export default class VirtualTimeScheduler implements Scheduler { } this.frame = 0; } + + addAction(action: Action) { + const findDelay = action.delay; + const actions = this.actions; + const len = actions.length; + const vaction = >action; + + + actions.push(action); + + actions.sort((a:VirtualAction, b:VirtualAction) => { + return (a.delay === b.delay) ? (a.index === b.index ? 0 : (a.index > b.index ? 1 : -1)) : (a.delay > b.delay ? 1 : -1); + }); + } schedule(work: (x?: any) => Subscription | void, delay: number = 0, state?: any): Subscription { this.sorted = false; @@ -59,7 +63,7 @@ class VirtualAction extends Subscription implements Action { new VirtualAction(scheduler, this.work, scheduler.index += 1); action.state = state; action.delay = scheduler.frame + delay; - scheduler.actions.push(action); + scheduler.addAction(action); return this; }