Skip to content

Commit

Permalink
fix(expand): accept scheduler parameter
Browse files Browse the repository at this point in the history
Also moves the handling of the default value for optional parameters to
the expand function instead of the operator's ctor.

Closes #841.
  • Loading branch information
luisgabriel authored and benlesh committed Dec 8, 2015
1 parent ef83066 commit 79e9084
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 15 deletions.
26 changes: 25 additions & 1 deletion spec/operators/expand-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,28 @@ describe('Observable.prototype.expand()', function () {
done();
});
});
});

it('should work when passing undefined for the optional arguments', function () {
var values = {
a: 1,
b: 1 + 1, // a + a,
c: 2 + 2, // b + b,
d: 4 + 4, // c + c,
e: 8 + 8, // d + d
};
var e1 = hot('(a|)', values);
var e1subs = '^ ! ';
var e2shape = '---(z|) ';
var expected = 'a--b--c--d--(e|)';

var result = e1.expand(function (x) {
if (x === 16) {
return Observable.empty();
}
return cold(e2shape, { z: x + x });
}, undefined, undefined);

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface CoreOperators<T> {
delay?: (delay: number, scheduler?: Scheduler) => Observable<T>;
distinctUntilChanged?: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
do?: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable<T>;
expand?: <R>(project: (x: T, ix: number) => Observable<R>) => Observable<R>;
expand?: <R>(project: (x: T, ix: number) => Observable<R>, concurrent: number, scheduler: Scheduler) => Observable<R>;
filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
finally?: (ensure: () => void, thisArg?: any) => Observable<T>;
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
Expand Down
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export class Observable<T> implements CoreOperators<T> {
delay: (delay: number, scheduler?: Scheduler) => Observable<T>;
distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable<T>;
do: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable<T>;
expand: <R>(project: (x: T, ix: number) => Observable<R>) => Observable<R>;
expand: <R>(project: (x: T, ix: number) => Observable<R>, concurrent: number, scheduler: Scheduler) => Observable<R>;
filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
finally: (ensure: () => void, thisArg?: any) => Observable<T>;
first: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
Expand Down
34 changes: 24 additions & 10 deletions src/operator/expand-support.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
Expand All @@ -8,12 +9,13 @@ import {InnerSubscriber} from '../InnerSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

export class ExpandOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<any>,
private concurrent: number = Number.POSITIVE_INFINITY) {
constructor(private project: (value: T, index: number) => Observable<R>,
private concurrent: number,
private scheduler: Scheduler) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new ExpandSubscriber(subscriber, this.project, this.concurrent);
return new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler);
}
}

Expand All @@ -25,13 +27,18 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {

constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => Observable<R>,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent: number,
private scheduler: Scheduler) {
super(destination);
if (concurrent < Number.POSITIVE_INFINITY) {
this.buffer = [];
}
}

private static dispatch({subscriber, result, value, index}): void {
subscriber.subscribeToProjection(result, value, index);
}

_next(value: any): void {
const destination = this.destination;

Expand All @@ -46,19 +53,26 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
let result = tryCatch(this.project)(value, index);
if (result === errorObject) {
destination.error(result.e);
} else if (!this.scheduler) {
this.subscribeToProjection(result, value, index);
} else {
if (result._isScalar) {
this._next(result.value);
} else {
this.active++;
this.add(subscribeToResult<T, R>(this, result, value, index));
}
const state = { subscriber: this, result, value, index };
this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
}
} else {
this.buffer.push(value);
}
}

private subscribeToProjection(result, value: T, index: number): void {
if (result._isScalar) {
this._next(result.value);
} else {
this.active++;
this.add(subscribeToResult<T, R>(this, result, value, index));
}
}

_complete(): void {
this.hasCompleted = true;
if (this.hasCompleted && this.active === 0) {
Expand Down
8 changes: 6 additions & 2 deletions src/operator/expand.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {ExpandOperator} from './expand-support';

export function expand<T, R>(project: (value: T, index: number) => Observable<R>,
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
return this.lift(new ExpandOperator(project, concurrent));
concurrent: number = Number.POSITIVE_INFINITY,
scheduler: Scheduler = undefined): Observable<R> {
concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;

return this.lift(new ExpandOperator(project, concurrent, scheduler));
}

0 comments on commit 79e9084

Please sign in to comment.