Skip to content

Commit

Permalink
feat(operator): add withLatestFrom
Browse files Browse the repository at this point in the history
closes #209
  • Loading branch information
benlesh committed Aug 28, 2015
1 parent 2239313 commit 322218a
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 1 deletion.
27 changes: 27 additions & 0 deletions spec/operators/withLatestFrom-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.withLatestFrom()', function () {
it('should merge the emitted value with the latest values of the other observables', function (done) {
var a = Observable.of('a');
var b = Observable.of('b', 'c');

Observable.value('d').delay(100)
.withLatestFrom(a, b, function (x, a, b) { return [x, a, b]; })
.subscribe(function (x) {
expect(x).toEqual(['d', 'a', 'c']);
}, null, done);
});

it('should emit nothing if the other observables never emit', function (done) {
var a = Observable.of('a');
var b = Observable.never();

Observable.value('d').delay(100)
.withLatestFrom(a, b, function (x, a, b) { return [x, a, b]; })
.subscribe(function (x) {
expect('this was called').toBe(false);
}, null, done);
});
});
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export default class Observable<T> {
static combineLatest: <T>(...observables: (Observable<any> | ((...values: Array<any>) => T)) []) => Observable<T>;
combineLatest: <R>(...observables: (Observable<any> | ((...values: Array<any>) => R)) []) => Observable<R>;
combineAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;

withLatestFrom: <R>(...observables: (Observable<any> | ((...values: Array<any>) => R)) []) => Observable<R>;
static zip: <T>(...observables: (Observable<any> | ((...values: Array<any>) => T)) []) => Observable<T>;
zip: <R>(...observables: (Observable<any> | ((...values: Array<any>) => R)) []) => Observable<R>;
zipAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged;

import {combineLatest, combineLatestProto} from './operators/combineLatest';
import combineAll from './operators/combineAll';
import withLatestFrom from './operators/withLatestFrom';

Observable.combineLatest = combineLatest;
observableProto.combineLatest = combineLatestProto;
observableProto.combineAll = combineAll;
observableProto.withLatestFrom = withLatestFrom;

import {zip, zipProto} from './operators/zip';
import zipAll from './operators/zipAll';
Expand Down
72 changes: 72 additions & 0 deletions src/operators/withLatestFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function withLatestFrom<R>(...args: (Observable<any>|((...values: any[]) => Observable<R>))[]): Observable<R> {
const project = <((...values: any[]) => Observable<R>)>args.pop();
const observables = <Observable<any>[]>args;
return this.lift(new WithLatestFromOperator(observables, project));
}

export class WithLatestFromOperator<T, R> implements Operator<T, R> {
constructor(private observables: Observable<any>[], private project: (...values: any[]) => Observable<R>) {
}

call(observer: Observer<R>): Observer<T> {
return new WithLatestFromSubscriber<T, R>(observer, this.observables, this.project);
}
}

export class WithLatestFromSubscriber<T, R> extends Subscriber<T> {
private values: any[];
private toSet: number;

constructor(destination: Observer<T>, private observables: Observable<any>[], private project: (...values: any[]) => Observable<R>) {
super(destination);
const len = observables.length;
this.values = new Array(len);
this.toSet = len;
for (let i = 0; i < len; i++) {
this.add(observables[i].subscribe(new WithLatestInnerSubscriber(this, i)))
}
}

notifyValue(index, value) {
this.values[index] = value;
this.toSet--;
}

_next(value: T) {
if (this.toSet === 0) {
const values = this.values;
let result = tryCatch(this.project)([value, ...values]);
if (result === errorObject) {
this.destination.error(result.e);
} else {
this.destination.next(result);
}
}
}
}

export class WithLatestInnerSubscriber<T, R> extends Subscriber<T> {
constructor(private parent: WithLatestFromSubscriber<T, R>, private valueIndex: number) {
super(null)
}

_next(value: T) {
this.parent.notifyValue(this.valueIndex, value);
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
// noop
}
}

0 comments on commit 322218a

Please sign in to comment.