Skip to content

Commit

Permalink
fix(expand): fix expand operator to match Rx3
Browse files Browse the repository at this point in the history
- adds virtual time tests for expand
- refactors expand to be its own operator, reducing inheritence
  • Loading branch information
benlesh committed Sep 15, 2015
1 parent b9b2ba5 commit 67f9623
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 57 deletions.
64 changes: 46 additions & 18 deletions spec/operators/expand-spec.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,57 @@
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.expand()', function () {
it('should map and recursively flatten', function (done) {
var expected = [1, 2, 3, 4, 5];
Observable.of(0).expand(function (x) {
if (x > 4) {
it('should map and recursively flatten', function() {
var values = {
a: 1,
b: 1 + 1, // a + a,
c: 2 + 2, // b + b,
d: 4 + 4, // c + c,
e: 8 + 8, // d + d
}
var e1 = hot('a', values);
/*
expectation explanation: (conjunction junction?) ...
since `cold('---(z|)')` emits `x + x` and completes on frame 30
but the next "expanded" return value is synchronously subscribed to in
that same frame, it stacks like so:
a
---(b|)
---(c|)
---(d|)
---(e|) (...which flattens into:)
a--b--c--d--(e|)
*/
var expected = 'a--b--c--d--(e|)';

expectObservable(e1.expand(function(x) {
if(x === 16) {
return Observable.empty();
}
return Observable.of(x + 1);
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
return cold('---(z|)', { z: x + x });
})).toBe(expected, values);
});
it('should map and recursively flatten with ScalarObservables', function (done) {
var expected = [1, 2, 3, 4, 5];
Observable.of(0).expand(function (x) {
if (x > 4) {

it('should map and recursively flatten with scalars', function() {
var values = {
a: 1,
b: 1 + 1, // a + a,
c: 2 + 2, // b + b,
d: 4 + 4, // c + c,
e: 8 + 8, // d + d
}
var e1 = hot('a', values);
var expected = '(abcde|)';

expectObservable(e1.expand(function(x) {
if(x === 16) {
return Observable.empty();
}
return Observable.of(x + 1);
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
return Observable.of(x + x); // scalar
})).toBe(expected, values);
});
});
105 changes: 66 additions & 39 deletions src/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,96 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

import { MergeSubscriber, MergeInnerSubscriber } from './merge-support';
// import { MergeAllSubscriber, MergeAllInnerSubscriber } from './mergeAll-support';
import EmptyObservable from '../observables/EmptyObservable';
import ScalarObservable from '../observables/ScalarObservable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function expand<T>(project: (x: T, ix: number) => Observable<any>): Observable<any> {
return this.lift(new ExpandOperator(project));
export default function expand<T, R>(project: (value: T, index: number) => Observable<R>,
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
return this.lift(new ExpandOperator(project, concurrent));
}

class ExpandOperator<T, R> implements Operator<T, R> {

project: (x: T, ix: number) => Observable<any>;

constructor(project: (x: T, ix: number) => Observable<any>) {
this.project = project;
constructor(private project: (value: T, index: number) => Observable<any>,
private concurrent: number = Number.POSITIVE_INFINITY) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new ExpandSubscriber(subscriber, this.project);
return new ExpandSubscriber(subscriber, this.project, this.concurrent);
}
}

class ExpandSubscriber<T, R> extends MergeSubscriber<T, R> {

project: (x: T, ix: number) => Observable<any>;

constructor(destination: Observer<R>,
project: (x: T, ix: number) => Observable<any>) {
super(destination, Number.POSITIVE_INFINITY);
this.project = project;
}

_project(value, index) {
const observable = tryCatch(this.project).call(this, value, index);
if (observable === errorObject) {
this.error(errorObject.e);
return null;
class ExpandSubscriber<T, R> extends Subscriber<T> {
private index: number = 0;
private active: number = 0;
private hasCompleted: boolean = true;
private buffer: T[];

constructor(destination: Observer<T>, private project: (value: T, index: number) => Observable<R>,
private concurrent: number = Number.POSITIVE_INFINITY) {
super(destination);
if(concurrent < Number.POSITIVE_INFINITY) {
this.buffer = [];
}
return observable;
}

_subscribeInner(observable, value, index) {
if(observable._isScalar) {
this.destination.next((<ScalarObservable<T>> observable).value);
this._innerComplete();
this._next((<any>observable).value);
} else if(observable instanceof EmptyObservable) {
this._innerComplete();

_next(value: T) {
const index = this.index++;
this.destination.next(value);
if(this.active < this.concurrent) {
let result = tryCatch(this.project)(value, index);
if(result === errorObject) {
this.destination.error(result.e);
} else {
if(result._isScalar) {
this._next(result.value);
} else {
let innerSub = new Subscription();
this.active++;
innerSub.add(result.subscribe(new ExpandInnerSubscriber(this.destination, this, innerSub)));
this.add(innerSub);
}
}
} else {
return observable._subscribe(new ExpandInnerSubscriber(this.destination, this));
this.buffer.push(value);
}
}

_complete() {
this.hasCompleted = true;
if(this.hasCompleted && this.active === 0) {
this.destination.complete();
}
}

notifyComplete(innerSub: Subscription<T>) {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if(buffer && buffer.length > 0) {
this._next(buffer.shift());
}
if(this.hasCompleted && this.active === 0) {
this.destination.complete();
}
}
}

class ExpandInnerSubscriber<T, R> extends MergeInnerSubscriber<T, R> {
constructor(destination: Observer<T>, parent: ExpandSubscriber<T, R>) {
super(destination, parent);
class ExpandInnerSubscriber<T, R> extends Subscriber<T> {
constructor(destination: Observer<T>, private parent: ExpandSubscriber<T, R>, private innerSub: Subscription<T>) {
super(destination);
}

_next(value) {
this.destination.next(value);
this.parent.next(value);
this.parent._next(value);
}

_complete() {
this.parent.notifyComplete(this.innerSub);
}
}

0 comments on commit 67f9623

Please sign in to comment.