Skip to content

Commit

Permalink
feat(combineLatest): support for observable dictionaries (#5022) (#5363)
Browse files Browse the repository at this point in the history
  • Loading branch information
rraziel authored Jul 14, 2020
1 parent afbbfbf commit f5278aa
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 10 deletions.
4 changes: 4 additions & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ export declare function combineLatest<O extends ObservableInput<any>, R>(array:
export declare function combineLatest<O extends ObservableInput<any>>(...observables: Array<O | SchedulerLike>): Observable<any[]>;
export declare function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike>): Observable<R>;
export declare function combineLatest<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R) | SchedulerLike>): Observable<R>;
export declare function combineLatest(sourcesObject: {}): Observable<never>;
export declare function combineLatest<T, K extends keyof T>(sourcesObject: T): Observable<{
[K in keyof T]: ObservedValueOf<T[K]>;
}>;

export interface CompleteNotification {
kind: 'C';
Expand Down
10 changes: 10 additions & 0 deletions spec-dtslint/observables/combineLatest-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,13 @@ it('should accept 6 params and a result selector', () => {
it('should accept 7 or more params and a result selector', () => {
const o = combineLatest([a$, b$, c$, d$, e$, f$, g$, g$, g$], (a: any, b: any, c: any, d: any, e: any, f: any, g1: any, g2: any, g3: any) => new A()); // $ExpectType Observable<A>
});

describe('combineLatest({})', () => {
it('should properly type empty objects', () => {
const res = combineLatest({}); // $ExpectType Observable<never>
});

it('should work for the simple case', () => {
const res = combineLatest({ foo: a$, bar: b$, baz: c$ }); // $ExpectType Observable<{ foo: A; bar: B; baz: C; }>
});
});
16 changes: 15 additions & 1 deletion spec/observables/combineLatest-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { queueScheduler as rxQueueScheduler, combineLatest, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { map, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -66,6 +66,20 @@ describe('static combineLatest', () => {
});
});

it('should accept a dictionary of observables', () => {
rxTestScheduler.run(({ hot, expectObservable }) => {
const firstSource = hot('----a----b----c----|');
const secondSource = hot('--d--e--f--g--|');
const expected = ' ----uv--wx-y--z----|';

const combined = combineLatest({a: firstSource, b: secondSource}).pipe(
map(({a, b}) => '' + a + b)
);

expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'});
});
});

it('should work with two nevers', () => {
rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const e1 = cold(' -');
Expand Down
53 changes: 44 additions & 9 deletions src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Subscriber } from '../Subscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { Operator } from '../Operator';
import { InnerSubscriber } from '../InnerSubscriber';
import { isObject } from '../util/isObject';
import { subscribeToResult } from '../util/subscribeToResult';
import { fromArray } from './fromArray';
import { lift } from '../util/lift';
Expand Down Expand Up @@ -100,6 +101,11 @@ export function combineLatest<O extends ObservableInput<any>, R>(...observables:

/** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R) | SchedulerLike>): Observable<R>;

// combineLatest({})
export function combineLatest(sourcesObject: {}): Observable<never>;
export function combineLatest<T, K extends keyof T>(sourcesObject: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;

/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -160,7 +166,24 @@ export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((
* // [1, 1] after 1.5s
* // [2, 1] after 2s
* ```
* ### Combine a dictionary of Observables
* ```ts
* import { combineLatest, of } from 'rxjs';
* import { delay, startWith } from 'rxjs/operators';
*
* const observables = {
* a: of(1).pipe(delay(1000), startWith(0)),
* b: of(5).pipe(delay(5000), startWith(0)),
* c: of(10).pipe(delay(10000), startWith(0))
* };
* const combined = combineLatest(observables);
* combined.subscribe(value => console.log(value));
* // Logs
* // {a: 0, b: 0, c: 0} immediately
* // {a: 1, b: 0, c: 0} after 1s
* // {a: 1, b: 5, c: 0} after 5s
* // {a: 1, b: 5, c: 10} after 10s
* ```
* ### Combine an array of Observables
* ```ts
* import { combineLatest, of } from 'rxjs';
Expand Down Expand Up @@ -219,6 +242,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
): Observable<R> {
let resultSelector: ((...values: Array<any>) => R) | undefined = undefined;
let scheduler: SchedulerLike | undefined = undefined;
let keys: Array<string> | undefined = undefined;

if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop() as SchedulerLike;
Expand All @@ -228,21 +252,30 @@ export function combineLatest<O extends ObservableInput<any>, R>(
resultSelector = observables.pop() as (...values: Array<any>) => R;
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], resultSelector)`
if (observables.length === 1 && isArray(observables[0])) {
observables = observables[0] as any;
if (observables.length === 1) {
const first = observables[0] as any;
if (isArray(first)) {
// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], resultSelector)`
observables = first;
}
// if the first and only argument is an object, assume it's been called with
// `combineLatest({})`
if (isObject(first) && Object.getPrototypeOf(first) === Object.prototype) {
keys = Object.keys(first);
observables = keys.map(key => first[key]);
}
}

return lift(fromArray(observables, scheduler), new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
return lift(fromArray(observables, scheduler), new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector, keys));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
constructor(private resultSelector?: (...values: Array<any>) => R) {
constructor(private resultSelector?: (...values: Array<any>) => R, private keys?: Array<string>) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new CombineLatestSubscriber(subscriber, this.resultSelector));
return source.subscribe(new CombineLatestSubscriber(subscriber, this.resultSelector, this.keys));
}
}

Expand All @@ -257,7 +290,7 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
private observables: any[] = [];
private toRespond: number | undefined;

constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R, private keys?: Array<string>) {
super(destination);
}

Expand Down Expand Up @@ -301,7 +334,9 @@ export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
if (this.resultSelector) {
this._tryResultSelector(values);
} else {
this.destination.next(values.slice());
this.destination.next(this.keys ?
this.keys.reduce((result, key, i) => ((result as any)[key] = values[i], result), {}) :
values.slice());
}
}
}
Expand Down

0 comments on commit f5278aa

Please sign in to comment.