Skip to content

Commit

Permalink
refactor(switchScan): Make switchScan behavior more close to scan()
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsik committed Apr 8, 2020
1 parent e4bf3b3 commit 78dec24
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs_app/content/guide/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ These are Observable creation operators that also have join functionality -- emi
- [`partition`](/api/operators/partition)
- [`pluck`](/api/operators/pluck)
- [`scan`](/api/operators/scan)
- [`switchScan`](/api/operators/switchScan)
- [`switchMap`](/api/operators/switchMap)
- [`switchMapTo`](/api/operators/switchMapTo)
- [`window`](/api/operators/window)
Expand Down
3 changes: 3 additions & 0 deletions docs_app/tools/decision-tree-generator/src/tree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@
- label: and output the computed values as a nested Observable when the source emits a value
children:
- label: mergeScan
- label: and output the computed values as a nested Observable when the source emits a value while unsubscribing from the previous nested Observable
children:
- label: switchScan
- label: I want to wrap its messages with metadata
children:
- label: that describes each notification (next, error, or complete)
Expand Down
38 changes: 38 additions & 0 deletions spec-dtslint/operators/switchScan-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { of } from 'rxjs';
import { switchScan } from 'rxjs/operators';

it('should infer correctly', () => {
const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)))); // $ExpectType Observable<boolean>
});

it('should infer correctly when using a single type', () => {
const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v))); // $ExpectType Observable<number>
});

it('should infer correctly when using a seed', () => {
const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable<number>
});

it('should infer correctly when using seed of a different type', () => {
const o = of(1, 2, 3).pipe(switchScan((acc: string, v: number) => of(acc + v), '0')); // $ExpectType Observable<string>
});

it('should support a projector that takes an index', () => {
const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index) => of(Boolean(v)))); // $ExpectType Observable<boolean>
});

it('should enforce types', () => {
const o = of(1, 2, 3).pipe(switchScan()); // $ExpectError
});

it('should enforce the return type to be Observable', () => {
const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError
});

it('should enforce seed and the return type from accumulator', () => {
const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectError
});

it('should enforce seed and accumulator to have the same type', () => {
const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError
});
59 changes: 33 additions & 26 deletions spec/operators/switchScan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,35 @@ import { concat, defer, Observable, of } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators';

declare function asDiagram(arg: string): Function;

/** @test {switchScan} */
describe.only('switchScan', () => {
it('should map-and-flatten each item to an Observable while passing the seed', () => {
asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)')
('should map-and-flatten each item to an Observable while passing the seed', () => {
const e1 = hot('--1-----3--5-------|');
const e1subs = '^ !';
const e2 = cold('x-x-x| ', {x: 10});
const expected = '--x-x-x-y-yz-z-z---|';
const values = {x: 10, y: 40, z: 90};

const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0));
const result = e1.pipe(switchScan((acc: number, x: string) => e2.pipe(map(i => i * Number(x) + acc)), 0));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should pass seed even when seed is not defined', () => {
const seeds: any[] = [];
const result = of(1, 3, 5).pipe(switchScan((acc, x) => {
seeds.push(acc);
return of(10).pipe(map(v => (v * Number(x)) + ((acc || 0) as number)));
})).subscribe();
it('should not pass seed when undefined', () => {
const seeds: number[] = [];

expect(seeds).to.deep.equal([undefined, 10, 40]);
of(1, 3, 5).pipe(
switchScan((acc, x) => {
seeds.push(acc);
return of(10).pipe(map(v => v * x + acc));
}),
).subscribe();

expect(seeds).to.deep.equal([1, 31]);
});

it('should unsub inner observables', () => {
Expand All @@ -39,6 +45,7 @@ describe.only('switchScan', () => {
unsubbed.push(x);
};
}),
null
)
).subscribe();

Expand All @@ -56,7 +63,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -72,7 +79,7 @@ describe.only('switchScan', () => {
throw 'error';
}

expectObservable(e1.pipe(switchScan(project))).toBe(expected);
expectObservable(e1.pipe(switchScan(project, null))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand All @@ -88,7 +95,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -110,7 +117,7 @@ describe.only('switchScan', () => {

const result = e1.pipe(
mergeMap(x => of(x)),
switchScan((acc, value) => observableLookup[value]),
switchScan((acc, value) => observableLookup[value], null),
mergeMap(x => of(x)),
);

Expand Down Expand Up @@ -138,7 +145,7 @@ describe.only('switchScan', () => {
);

of(null).pipe(
switchScan(() => synchronousObservable),
switchScan(() => synchronousObservable, null),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

Expand All @@ -156,7 +163,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -175,7 +182,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -194,7 +201,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -213,7 +220,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -232,7 +239,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -251,7 +258,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -270,7 +277,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -289,7 +296,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected, undefined, 'sad');
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand All @@ -308,7 +315,7 @@ describe.only('switchScan', () => {

const observableLookup = { x: x, y: y };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected, undefined, 'sad');
expectSubscriptions(x.subscriptions).toBe(xsubs);
Expand Down Expand Up @@ -358,21 +365,21 @@ describe.only('switchScan', () => {

const observableLookup = { x: x };

const result = e1.pipe(switchScan((acc, value) => observableLookup[value]));
const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null));

expectObservable(result).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should pass index to the project function', () => {
it('should pass index to the accumulator function', () => {
const indices: number[] = [];

of('a', 'b', 'c', 'd').pipe(
switchScan((acc, x, index) => {
indices.push(index);
return of();
}),
}, null),
).subscribe();

expect(indices).to.deep.equal([0, 1, 2, 3]);
Expand Down
3 changes: 3 additions & 0 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import { ObservableInput, OperatorFunction } from '../types';
* // ...and so on for each click
* ```
*
* @see {@link scan}
* @see {@link switchScan}
*
* @param {function(acc: R, value: T): Observable<R>} accumulator
* The accumulator function called on each source value.
* @param seed The initial accumulation value.
Expand Down
1 change: 1 addition & 0 deletions src/internal/operators/scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function scan<V, A, S>(accumulator: (acc: A|S, value: V, index: number) =
* @see {@link expand}
* @see {@link mergeScan}
* @see {@link reduce}
* @see {@link switchScan}
*
* @param {function(acc: A, value: V, index: number): A} accumulator
* The accumulator function called on each source value.
Expand Down
58 changes: 43 additions & 15 deletions src/internal/operators/switchScan.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction } from '../types';
import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction } from '../types';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { of } from '../observable/of';
import { switchMap } from './switchMap';
import { tap } from './tap';

export function switchScan<T>(accumulator: (acc: T, value: T, index: number) => ObservableInput<T>, seed?: T): MonoTypeOperatorFunction<T>;
export function switchScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, seed?: R): OperatorFunction<T, R>;

/**
* Applies an accumulator function over the source Observable where the
* accumulator function itself returns an Observable, emitting values
* only from the most recently returned Observable.
*
* <span class="informal">It's like {@link scan}, but only to most recent
* Observable returned by the accumulator is merged into the outer Observable.</span>
*
* @see {@link scan}
* @see {@link mergeScan}
*
* @param {function(acc: R, value: T, index: number): Observable<R>} accumulator
* The accumulator function called on each source value.
* @param {T|R} [seed] The initial accumulation value.
* @return {Observable<R>} An observable of the accumulated values.
* @method switchScan
* @owner Observable
*/
export function switchScan<T, R>(
project: (acc: R, value: T, index: number) => ObservableInput<R>,
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
seed?: R
): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift(new SwitchScanOperator(project, seed));
let hasSeed = false;
// The same reason as described in `scan` https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/scan.ts#L54-L58
if (arguments.length >= 2) {
hasSeed = true;
}

return (source: Observable<T>) => source.lift(new SwitchScanOperator(accumulator, seed, hasSeed));
}

class SwitchScanOperator<T, R> implements Operator<T, R> {
private acc: R;

constructor(private project: (acc: R, value: T, index: number) => ObservableInput<R>,
seed: R
) {
this.acc = seed;
}
constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, private seed: R, private hasSeed: boolean = false) { }

call(subscriber: Subscriber<R>, source: any): any {
const wrappedProject = (value: T, index: number): ObservableInput<R> =>
this.project(this.acc, value, index);

return source.pipe(
switchMap(wrappedProject),
tap((value: R) => this.acc = value),
switchMap((value: T, index: number): ObservableInput<R> => {
if (this.hasSeed) {
return this.accumulator(this.seed, value, index);
} else {
return of(value);
}
}),
tap((value: R) => {
this.seed = value;
this.hasSeed = true;
}),
).subscribe(subscriber);
}
}

0 comments on commit 78dec24

Please sign in to comment.