diff --git a/docs_app/content/guide/operators.md b/docs_app/content/guide/operators.md index 8a22cdbd711..4edf064c14f 100644 --- a/docs_app/content/guide/operators.md +++ b/docs_app/content/guide/operators.md @@ -172,6 +172,7 @@ These are Observable creation operators that also have join functionality -- emi - [`partition`](/api/operators/partition) - [`pluck`](/api/operators/pluck) - [`scan`](/api/operators/scan) +- [`switchScan`](/api/operators/switchScan) - [`switchMap`](/api/operators/switchMap) - [`switchMapTo`](/api/operators/switchMapTo) - [`window`](/api/operators/window) diff --git a/docs_app/tools/decision-tree-generator/src/tree.yml b/docs_app/tools/decision-tree-generator/src/tree.yml index 0025033c45e..cadb5399832 100644 --- a/docs_app/tools/decision-tree-generator/src/tree.yml +++ b/docs_app/tools/decision-tree-generator/src/tree.yml @@ -115,6 +115,9 @@ - label: and output the computed values as a nested Observable when the source emits a value children: - label: mergeScan + - label: and output the computed values as a nested Observable when the source emits a value while unsubscribing from the previous nested Observable + children: + - label: switchScan - label: I want to wrap its messages with metadata children: - label: that describes each notification (next, error, or complete) diff --git a/spec-dtslint/operators/switchScan-spec.ts b/spec-dtslint/operators/switchScan-spec.ts new file mode 100644 index 00000000000..051550bf2fd --- /dev/null +++ b/spec-dtslint/operators/switchScan-spec.ts @@ -0,0 +1,38 @@ +import { of } from 'rxjs'; +import { switchScan } from 'rxjs/operators'; + +it('should infer correctly', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)))); // $ExpectType Observable +}); + +it('should infer correctly when using a single type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v))); // $ExpectType Observable +}); + +it('should infer correctly when using a seed', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable +}); + +it('should infer correctly when using seed of a different type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: string, v: number) => of(acc + v), '0')); // $ExpectType Observable +}); + +it('should support a projector that takes an index', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index) => of(Boolean(v)))); // $ExpectType Observable +}); + +it('should enforce types', () => { + const o = of(1, 2, 3).pipe(switchScan()); // $ExpectError +}); + +it('should enforce the return type to be Observable', () => { + const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError +}); + +it('should enforce seed and the return type from accumulator', () => { + const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectError +}); + +it('should enforce seed and accumulator to have the same type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError +}); diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index 80fa4922c5f..f71e7dbe05a 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -3,29 +3,35 @@ import { concat, defer, Observable, of } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; +declare function asDiagram(arg: string): Function; + /** @test {switchScan} */ describe.only('switchScan', () => { - it('should map-and-flatten each item to an Observable while passing the seed', () => { + asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)') + ('should map-and-flatten each item to an Observable while passing the seed', () => { const e1 = hot('--1-----3--5-------|'); const e1subs = '^ !'; const e2 = cold('x-x-x| ', {x: 10}); const expected = '--x-x-x-y-yz-z-z---|'; const values = {x: 10, y: 40, z: 90}; - const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0)); + const result = e1.pipe(switchScan((acc: number, x: string) => e2.pipe(map(i => i * Number(x) + acc)), 0)); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should pass seed even when seed is not defined', () => { - const seeds: any[] = []; - const result = of(1, 3, 5).pipe(switchScan((acc, x) => { - seeds.push(acc); - return of(10).pipe(map(v => (v * Number(x)) + ((acc || 0) as number))); - })).subscribe(); + it('should not pass seed when undefined', () => { + const seeds: number[] = []; - expect(seeds).to.deep.equal([undefined, 10, 40]); + of(1, 3, 5).pipe( + switchScan((acc, x) => { + seeds.push(acc); + return of(10).pipe(map(v => v * x + acc)); + }), + ).subscribe(); + + expect(seeds).to.deep.equal([1, 31]); }); it('should unsub inner observables', () => { @@ -39,6 +45,7 @@ describe.only('switchScan', () => { unsubbed.push(x); }; }), + null ) ).subscribe(); @@ -56,7 +63,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -72,7 +79,7 @@ describe.only('switchScan', () => { throw 'error'; } - expectObservable(e1.pipe(switchScan(project))).toBe(expected); + expectObservable(e1.pipe(switchScan(project, null))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); @@ -88,7 +95,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result, unsub).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -110,7 +117,7 @@ describe.only('switchScan', () => { const result = e1.pipe( mergeMap(x => of(x)), - switchScan((acc, value) => observableLookup[value]), + switchScan((acc, value) => observableLookup[value], null), mergeMap(x => of(x)), ); @@ -138,7 +145,7 @@ describe.only('switchScan', () => { ); of(null).pipe( - switchScan(() => synchronousObservable), + switchScan(() => synchronousObservable, null), takeWhile((x) => x != 2) // unsubscribe at the second side-effect ).subscribe(() => { /* noop */ }); @@ -156,7 +163,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -175,7 +182,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -194,7 +201,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -213,7 +220,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -232,7 +239,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -251,7 +258,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -270,7 +277,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -289,7 +296,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected, undefined, 'sad'); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -308,7 +315,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected, undefined, 'sad'); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -358,21 +365,21 @@ describe.only('switchScan', () => { const observableLookup = { x: x }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should pass index to the project function', () => { + it('should pass index to the accumulator function', () => { const indices: number[] = []; of('a', 'b', 'c', 'd').pipe( switchScan((acc, x, index) => { indices.push(index); return of(); - }), + }, null), ).subscribe(); expect(indices).to.deep.equal([0, 1, 2, 3]); diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 0572ed7af2c..04bbee3af29 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -37,6 +37,9 @@ import { ObservableInput, OperatorFunction } from '../types'; * // ...and so on for each click * ``` * + * @see {@link scan} + * @see {@link switchScan} + * * @param {function(acc: R, value: T): Observable} accumulator * The accumulator function called on each source value. * @param seed The initial accumulation value. diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index 66434c90930..689a48d9456 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -44,6 +44,7 @@ export function scan(accumulator: (acc: A|S, value: V, index: number) = * @see {@link expand} * @see {@link mergeScan} * @see {@link reduce} + * @see {@link switchScan} * * @param {function(acc: A, value: V, index: number): A} accumulator * The accumulator function called on each source value. diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index d19fb1836d9..bb19f3dc09d 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -1,33 +1,61 @@ import { Observable } from '../Observable'; -import { ObservableInput, OperatorFunction } from '../types'; +import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction } from '../types'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { of } from '../observable/of'; import { switchMap } from './switchMap'; import { tap } from './tap'; +export function switchScan(accumulator: (acc: T, value: T, index: number) => ObservableInput, seed?: T): MonoTypeOperatorFunction; +export function switchScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed?: R): OperatorFunction; + +/** + * Applies an accumulator function over the source Observable where the + * accumulator function itself returns an Observable, emitting values + * only from the most recently returned Observable. + * + * It's like {@link scan}, but only to most recent + * Observable returned by the accumulator is merged into the outer Observable. + * + * @see {@link scan} + * @see {@link mergeScan} + * + * @param {function(acc: R, value: T, index: number): Observable} accumulator + * The accumulator function called on each source value. + * @param {T|R} [seed] The initial accumulation value. + * @return {Observable} An observable of the accumulated values. + * @method switchScan + * @owner Observable + */ export function switchScan( - project: (acc: R, value: T, index: number) => ObservableInput, + accumulator: (acc: R, value: T, index: number) => ObservableInput, seed?: R ): OperatorFunction { - return (source: Observable) => source.lift(new SwitchScanOperator(project, seed)); + let hasSeed = false; + // The same reason as described in `scan` https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/scan.ts#L54-L58 + if (arguments.length >= 2) { + hasSeed = true; + } + + return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed, hasSeed)); } class SwitchScanOperator implements Operator { - private acc: R; - - constructor(private project: (acc: R, value: T, index: number) => ObservableInput, - seed: R - ) { - this.acc = seed; - } + constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R, private hasSeed: boolean = false) { } call(subscriber: Subscriber, source: any): any { - const wrappedProject = (value: T, index: number): ObservableInput => - this.project(this.acc, value, index); - return source.pipe( - switchMap(wrappedProject), - tap((value: R) => this.acc = value), + switchMap((value: T, index: number): ObservableInput => { + if (this.hasSeed) { + return this.accumulator(this.seed, value, index); + } else { + return of(value); + } + }), + tap((value: R) => { + this.seed = value; + this.hasSeed = true; + }), ).subscribe(subscriber); } }