From 57fdbee2b65c2aab7da60fc08a19fa454de67da5 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sun, 11 Nov 2018 16:54:20 +0100 Subject: [PATCH 01/12] feat(switchScan): add switchScan operator --- spec/operators/index-spec.ts | 1 + spec/operators/switchScan-spec.ts | 380 +++++++++++++++++++++++++++ src/internal/operators/switchScan.ts | 33 +++ src/operators/index.ts | 1 + 4 files changed, 415 insertions(+) create mode 100644 spec/operators/switchScan-spec.ts create mode 100644 src/internal/operators/switchScan.ts diff --git a/spec/operators/index-spec.ts b/spec/operators/index-spec.ts index 227b361985..ee89bdb6e9 100644 --- a/spec/operators/index-spec.ts +++ b/spec/operators/index-spec.ts @@ -77,6 +77,7 @@ describe('operators/index', () => { expect(index.skipWhile).to.exist; expect(index.startWith).to.exist; expect(index.switchAll).to.exist; + expect(index.switchScan).to.exist; expect(index.switchMap).to.exist; expect(index.switchMapTo).to.exist; expect(index.take).to.exist; diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts new file mode 100644 index 0000000000..80fa4922c5 --- /dev/null +++ b/spec/operators/switchScan-spec.ts @@ -0,0 +1,380 @@ +import { expect } from 'chai'; +import { concat, defer, Observable, of } from 'rxjs'; +import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; + +/** @test {switchScan} */ +describe.only('switchScan', () => { + it('should map-and-flatten each item to an Observable while passing the seed', () => { + const e1 = hot('--1-----3--5-------|'); + const e1subs = '^ !'; + const e2 = cold('x-x-x| ', {x: 10}); + const expected = '--x-x-x-y-yz-z-z---|'; + const values = {x: 10, y: 40, z: 90}; + + const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0)); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should pass seed even when seed is not defined', () => { + const seeds: any[] = []; + const result = of(1, 3, 5).pipe(switchScan((acc, x) => { + seeds.push(acc); + return of(10).pipe(map(v => (v * Number(x)) + ((acc || 0) as number))); + })).subscribe(); + + expect(seeds).to.deep.equal([undefined, 10, 40]); + }); + + it('should unsub inner observables', () => { + const unsubbed: string[] = []; + + of('a', 'b').pipe( + switchScan((acc, x) => + new Observable((subscriber) => { + subscriber.complete(); + return () => { + unsubbed.push(x); + }; + }), + ) + ).subscribe(); + + expect(unsubbed).to.deep.equal(['a', 'b']); + }); + + it('should switch inner cold observables', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ !'; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c----f---g---h---i--|'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error when projection throws', () => { + const e1 = hot('-------x-----y---|'); + const e1subs = '^ ! '; + const expected = '-------# '; + function project(): any[] { + throw 'error'; + } + + expectObservable(e1.pipe(switchScan(project))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner cold observables, outer is unsubscribed early', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const unsub = ' ! '; + const expected = '-----------a--b--c---- '; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c---- '; + const unsub = ' ! '; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe( + mergeMap(x => of(x)), + switchScan((acc, value) => observableLookup[value]), + mergeMap(x => of(x)), + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + switchScan(() => synchronousObservable), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + + it('should switch inner cold observables, inner never completes', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--'); + const ysubs = ' ^ '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c----f---g---h---i--'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle a synchronous switch to the second inner observable', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' (^!) '; + const y = cold( '---f---g---h---i--| '); + const ysubs = ' ^ ! '; + const e1 = hot('---------(xy)----------------|'); + const e1subs = '^ !'; + const expected = '------------f---g---h---i----|'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner cold observables, one inner throws', () => { + const x = cold( '--a--b--#--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--'); + const ysubs: string[] = []; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--# '; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner hot observables', () => { + const x = hot('-----a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = hot('--p-o-o-p-------------f---g---h---i--|'); + const ysubs = ' ^ !'; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------c--d--e----f---g---h---i--|'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and empty', () => { + const x = cold('|'); + const y = cold('|'); + const xsubs = ' (^!) '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '-----------------------------|'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and never', () => { + const x = cold('|'); + const y = cold('-'); + const xsubs = ' (^!) '; + const ysubs = ' ^ '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '------------------------------'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner never and empty', () => { + const x = cold('-'); + const y = cold('|'); + const xsubs = ' ^ ! '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '-----------------------------|'; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner never and throw', () => { + const x = cold('-'); + const y = cold('#', null, 'sad'); + const xsubs = ' ^ ! '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ ! '; + const expected = '-------------------# '; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and throw', () => { + const x = cold('|'); + const y = cold('#', null, 'sad'); + const xsubs = ' (^!) '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ ! '; + const expected = '-------------------# '; + + const observableLookup = { x: x, y: y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '|'; + + const result = e1.pipe(switchScan((acc, value) => of(value))); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer never', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + const result = e1.pipe(switchScan((acc, value) => of(value))); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer throw', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + const result = e1.pipe(switchScan((acc, value) => of(value))); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer error', () => { + const x = cold( '--a--b--c--d--e--|'); + const xsubs = ' ^ ! '; + const e1 = hot('---------x---------# '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c-# '; + + const observableLookup = { x: x }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should pass index to the project function', () => { + const indices: number[] = []; + + of('a', 'b', 'c', 'd').pipe( + switchScan((acc, x, index) => { + indices.push(index); + return of(); + }), + ).subscribe(); + + expect(indices).to.deep.equal([0, 1, 2, 3]); + }); +}); diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts new file mode 100644 index 0000000000..d19fb1836d --- /dev/null +++ b/src/internal/operators/switchScan.ts @@ -0,0 +1,33 @@ +import { Observable } from '../Observable'; +import { ObservableInput, OperatorFunction } from '../types'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { switchMap } from './switchMap'; +import { tap } from './tap'; + +export function switchScan( + project: (acc: R, value: T, index: number) => ObservableInput, + seed?: R +): OperatorFunction { + return (source: Observable) => source.lift(new SwitchScanOperator(project, seed)); +} + +class SwitchScanOperator implements Operator { + private acc: R; + + constructor(private project: (acc: R, value: T, index: number) => ObservableInput, + seed: R + ) { + this.acc = seed; + } + + call(subscriber: Subscriber, source: any): any { + const wrappedProject = (value: T, index: number): ObservableInput => + this.project(this.acc, value, index); + + return source.pipe( + switchMap(wrappedProject), + tap((value: R) => this.acc = value), + ).subscribe(subscriber); + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 9ae9a424a3..34d069e1d1 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -80,6 +80,7 @@ export { skipWhile } from '../internal/operators/skipWhile'; export { startWith } from '../internal/operators/startWith'; export { subscribeOn } from '../internal/operators/subscribeOn'; export { switchAll } from '../internal/operators/switchAll'; +export { switchScan } from '../internal/operators/switchScan'; export { switchMap } from '../internal/operators/switchMap'; export { switchMapTo } from '../internal/operators/switchMapTo'; export { take } from '../internal/operators/take'; From 7231761939ff8fa9c814d35d37212cdb7385f08c Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sun, 30 Dec 2018 15:44:36 +0100 Subject: [PATCH 02/12] refactor(switchScan): Make switchScan behavior more close to `scan()` Closes #2931 --- docs_app/content/guide/operators.md | 1 + .../decision-tree-generator/src/tree.yml | 3 + spec-dtslint/operators/switchScan-spec.ts | 38 ++++++++++++ spec/operators/switchScan-spec.ts | 59 +++++++++++-------- src/internal/operators/mergeScan.ts | 3 + src/internal/operators/scan.ts | 1 + src/internal/operators/switchScan.ts | 58 +++++++++++++----- 7 files changed, 122 insertions(+), 41 deletions(-) create mode 100644 spec-dtslint/operators/switchScan-spec.ts diff --git a/docs_app/content/guide/operators.md b/docs_app/content/guide/operators.md index 7b30e280e7..1d3cf1f1db 100644 --- a/docs_app/content/guide/operators.md +++ b/docs_app/content/guide/operators.md @@ -174,6 +174,7 @@ These are Observable creation operators that also have join functionality -- emi - [`partition`](/api/operators/partition) - [`pluck`](/api/operators/pluck) - [`scan`](/api/operators/scan) +- [`switchScan`](/api/operators/switchScan) - [`switchMap`](/api/operators/switchMap) - [`switchMapTo`](/api/operators/switchMapTo) - [`window`](/api/operators/window) diff --git a/docs_app/tools/decision-tree-generator/src/tree.yml b/docs_app/tools/decision-tree-generator/src/tree.yml index 0025033c45..cadb539983 100644 --- a/docs_app/tools/decision-tree-generator/src/tree.yml +++ b/docs_app/tools/decision-tree-generator/src/tree.yml @@ -115,6 +115,9 @@ - label: and output the computed values as a nested Observable when the source emits a value children: - label: mergeScan + - label: and output the computed values as a nested Observable when the source emits a value while unsubscribing from the previous nested Observable + children: + - label: switchScan - label: I want to wrap its messages with metadata children: - label: that describes each notification (next, error, or complete) diff --git a/spec-dtslint/operators/switchScan-spec.ts b/spec-dtslint/operators/switchScan-spec.ts new file mode 100644 index 0000000000..051550bf2f --- /dev/null +++ b/spec-dtslint/operators/switchScan-spec.ts @@ -0,0 +1,38 @@ +import { of } from 'rxjs'; +import { switchScan } from 'rxjs/operators'; + +it('should infer correctly', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)))); // $ExpectType Observable +}); + +it('should infer correctly when using a single type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v))); // $ExpectType Observable +}); + +it('should infer correctly when using a seed', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable +}); + +it('should infer correctly when using seed of a different type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: string, v: number) => of(acc + v), '0')); // $ExpectType Observable +}); + +it('should support a projector that takes an index', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index) => of(Boolean(v)))); // $ExpectType Observable +}); + +it('should enforce types', () => { + const o = of(1, 2, 3).pipe(switchScan()); // $ExpectError +}); + +it('should enforce the return type to be Observable', () => { + const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError +}); + +it('should enforce seed and the return type from accumulator', () => { + const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectError +}); + +it('should enforce seed and accumulator to have the same type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError +}); diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index 80fa4922c5..f71e7dbe05 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -3,29 +3,35 @@ import { concat, defer, Observable, of } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; +declare function asDiagram(arg: string): Function; + /** @test {switchScan} */ describe.only('switchScan', () => { - it('should map-and-flatten each item to an Observable while passing the seed', () => { + asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)') + ('should map-and-flatten each item to an Observable while passing the seed', () => { const e1 = hot('--1-----3--5-------|'); const e1subs = '^ !'; const e2 = cold('x-x-x| ', {x: 10}); const expected = '--x-x-x-y-yz-z-z---|'; const values = {x: 10, y: 40, z: 90}; - const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0)); + const result = e1.pipe(switchScan((acc: number, x: string) => e2.pipe(map(i => i * Number(x) + acc)), 0)); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should pass seed even when seed is not defined', () => { - const seeds: any[] = []; - const result = of(1, 3, 5).pipe(switchScan((acc, x) => { - seeds.push(acc); - return of(10).pipe(map(v => (v * Number(x)) + ((acc || 0) as number))); - })).subscribe(); + it('should not pass seed when undefined', () => { + const seeds: number[] = []; - expect(seeds).to.deep.equal([undefined, 10, 40]); + of(1, 3, 5).pipe( + switchScan((acc, x) => { + seeds.push(acc); + return of(10).pipe(map(v => v * x + acc)); + }), + ).subscribe(); + + expect(seeds).to.deep.equal([1, 31]); }); it('should unsub inner observables', () => { @@ -39,6 +45,7 @@ describe.only('switchScan', () => { unsubbed.push(x); }; }), + null ) ).subscribe(); @@ -56,7 +63,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -72,7 +79,7 @@ describe.only('switchScan', () => { throw 'error'; } - expectObservable(e1.pipe(switchScan(project))).toBe(expected); + expectObservable(e1.pipe(switchScan(project, null))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); @@ -88,7 +95,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result, unsub).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -110,7 +117,7 @@ describe.only('switchScan', () => { const result = e1.pipe( mergeMap(x => of(x)), - switchScan((acc, value) => observableLookup[value]), + switchScan((acc, value) => observableLookup[value], null), mergeMap(x => of(x)), ); @@ -138,7 +145,7 @@ describe.only('switchScan', () => { ); of(null).pipe( - switchScan(() => synchronousObservable), + switchScan(() => synchronousObservable, null), takeWhile((x) => x != 2) // unsubscribe at the second side-effect ).subscribe(() => { /* noop */ }); @@ -156,7 +163,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -175,7 +182,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -194,7 +201,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -213,7 +220,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -232,7 +239,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -251,7 +258,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -270,7 +277,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -289,7 +296,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected, undefined, 'sad'); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -308,7 +315,7 @@ describe.only('switchScan', () => { const observableLookup = { x: x, y: y }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected, undefined, 'sad'); expectSubscriptions(x.subscriptions).toBe(xsubs); @@ -358,21 +365,21 @@ describe.only('switchScan', () => { const observableLookup = { x: x }; - const result = e1.pipe(switchScan((acc, value) => observableLookup[value])); + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); expectObservable(result).toBe(expected); expectSubscriptions(x.subscriptions).toBe(xsubs); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should pass index to the project function', () => { + it('should pass index to the accumulator function', () => { const indices: number[] = []; of('a', 'b', 'c', 'd').pipe( switchScan((acc, x, index) => { indices.push(index); return of(); - }), + }, null), ).subscribe(); expect(indices).to.deep.equal([0, 1, 2, 3]); diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 881f0849c1..e507a758a5 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -33,6 +33,9 @@ import { mergeInternals } from './mergeInternals'; * // ...and so on for each click * ``` * + * @see {@link scan} + * @see {@link switchScan} + * * @param {function(acc: R, value: T): Observable} accumulator * The accumulator function called on each source value. * @param seed The initial accumulation value. diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index f60efeb588..f7bf83f761 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -79,6 +79,7 @@ export function scan(accumulator: (acc: A | S, value: V, index: number) * @see {@link expand} * @see {@link mergeScan} * @see {@link reduce} + * @see {@link switchScan} * * @param accumulator A "reducer function". This will be called for each value after an initial state is * acquired. diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index d19fb1836d..bb19f3dc09 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -1,33 +1,61 @@ import { Observable } from '../Observable'; -import { ObservableInput, OperatorFunction } from '../types'; +import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction } from '../types'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { of } from '../observable/of'; import { switchMap } from './switchMap'; import { tap } from './tap'; +export function switchScan(accumulator: (acc: T, value: T, index: number) => ObservableInput, seed?: T): MonoTypeOperatorFunction; +export function switchScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed?: R): OperatorFunction; + +/** + * Applies an accumulator function over the source Observable where the + * accumulator function itself returns an Observable, emitting values + * only from the most recently returned Observable. + * + * It's like {@link scan}, but only to most recent + * Observable returned by the accumulator is merged into the outer Observable. + * + * @see {@link scan} + * @see {@link mergeScan} + * + * @param {function(acc: R, value: T, index: number): Observable} accumulator + * The accumulator function called on each source value. + * @param {T|R} [seed] The initial accumulation value. + * @return {Observable} An observable of the accumulated values. + * @method switchScan + * @owner Observable + */ export function switchScan( - project: (acc: R, value: T, index: number) => ObservableInput, + accumulator: (acc: R, value: T, index: number) => ObservableInput, seed?: R ): OperatorFunction { - return (source: Observable) => source.lift(new SwitchScanOperator(project, seed)); + let hasSeed = false; + // The same reason as described in `scan` https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/scan.ts#L54-L58 + if (arguments.length >= 2) { + hasSeed = true; + } + + return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed, hasSeed)); } class SwitchScanOperator implements Operator { - private acc: R; - - constructor(private project: (acc: R, value: T, index: number) => ObservableInput, - seed: R - ) { - this.acc = seed; - } + constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R, private hasSeed: boolean = false) { } call(subscriber: Subscriber, source: any): any { - const wrappedProject = (value: T, index: number): ObservableInput => - this.project(this.acc, value, index); - return source.pipe( - switchMap(wrappedProject), - tap((value: R) => this.acc = value), + switchMap((value: T, index: number): ObservableInput => { + if (this.hasSeed) { + return this.accumulator(this.seed, value, index); + } else { + return of(value); + } + }), + tap((value: R) => { + this.seed = value; + this.hasSeed = true; + }), ).subscribe(subscriber); } } From 9a0f0f9d3886535e38ca1da4efa3fb45706a094d Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sun, 30 Dec 2018 15:47:36 +0100 Subject: [PATCH 03/12] docs(switchScan): add link switchScan marble diagram into its docblock --- src/internal/operators/switchScan.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index bb19f3dc09..8bb68edf34 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -17,6 +17,8 @@ export function switchScan(accumulator: (acc: R, value: T, index: number) * It's like {@link scan}, but only to most recent * Observable returned by the accumulator is merged into the outer Observable. * + * ![](switchScan.png) + * * @see {@link scan} * @see {@link mergeScan} * From 1ce7029a1682d05eb91bd2a472a95e0e069f079e Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sun, 30 Dec 2018 21:16:00 +0100 Subject: [PATCH 04/12] test(switchScan): revert running only switchScan spec --- spec/operators/switchScan-spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index f71e7dbe05..baf008108c 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -6,7 +6,7 @@ import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; declare function asDiagram(arg: string): Function; /** @test {switchScan} */ -describe.only('switchScan', () => { +describe('switchScan', () => { asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)') ('should map-and-flatten each item to an Observable while passing the seed', () => { const e1 = hot('--1-----3--5-------|'); From 355c6f406b32998c83035312a59f0992f0e9a1a6 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Mon, 4 Feb 2019 17:47:02 +0100 Subject: [PATCH 05/12] fix(switchScan): fix overriding the same seed for multiple observers --- spec/operators/switchScan-spec.ts | 14 ++++++++++++++ src/internal/operators/switchScan.ts | 11 +++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index baf008108c..6770751a25 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -372,6 +372,20 @@ describe('switchScan', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should create a new seed for each subscriber', () => { + const seeds: string[] = []; + const observer = (value: string) => seeds.push(value); + + const source = of('a', 'b').pipe( + switchScan((acc, x) => of(x + 'x')) + ); + + source.subscribe(observer); + source.subscribe(observer); + + expect(seeds).to.deep.equal(['a', 'bx', 'a', 'bx']); + }); + it('should pass index to the accumulator function', () => { const indices: number[] = []; diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index 8bb68edf34..1a830fad8b 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -46,17 +46,20 @@ class SwitchScanOperator implements Operator { constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R, private hasSeed: boolean = false) { } call(subscriber: Subscriber, source: any): any { + let seed: R = this.seed; + let hasSeed: boolean = this.hasSeed; + return source.pipe( switchMap((value: T, index: number): ObservableInput => { - if (this.hasSeed) { - return this.accumulator(this.seed, value, index); + if (hasSeed) { + return this.accumulator(seed, value, index); } else { return of(value); } }), tap((value: R) => { - this.seed = value; - this.hasSeed = true; + seed = value; + hasSeed = true; }), ).subscribe(subscriber); } From e48328e1d6997379f517f05971d456e0790a6825 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Thu, 31 Jan 2019 23:22:20 +0100 Subject: [PATCH 06/12] docs(switchScan): fix typo --- src/internal/operators/switchScan.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index 1a830fad8b..3c73d072f3 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -14,7 +14,7 @@ export function switchScan(accumulator: (acc: R, value: T, index: number) * accumulator function itself returns an Observable, emitting values * only from the most recently returned Observable. * - * It's like {@link scan}, but only to most recent + * It's like {@link scan}, but only the most recent * Observable returned by the accumulator is merged into the outer Observable. * * ![](switchScan.png) From af92fb137c5e0d71406c8bd02b1a976f88b2e873 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Mon, 18 Mar 2019 11:44:26 +0100 Subject: [PATCH 07/12] refactor(switchScan): make seed mandatory as in mergeScan --- spec-dtslint/operators/switchScan-spec.ts | 6 ++--- spec/operators/switchScan-spec.ts | 16 ++++++------ src/internal/operators/switchScan.ts | 32 +++++------------------ 3 files changed, 17 insertions(+), 37 deletions(-) diff --git a/spec-dtslint/operators/switchScan-spec.ts b/spec-dtslint/operators/switchScan-spec.ts index 051550bf2f..86196c1bfb 100644 --- a/spec-dtslint/operators/switchScan-spec.ts +++ b/spec-dtslint/operators/switchScan-spec.ts @@ -2,11 +2,11 @@ import { of } from 'rxjs'; import { switchScan } from 'rxjs/operators'; it('should infer correctly', () => { - const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)))); // $ExpectType Observable + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)), false)); // $ExpectType Observable }); it('should infer correctly when using a single type', () => { - const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v))); // $ExpectType Observable + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable }); it('should infer correctly when using a seed', () => { @@ -18,7 +18,7 @@ it('should infer correctly when using seed of a different type', () => { }); it('should support a projector that takes an index', () => { - const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index) => of(Boolean(v)))); // $ExpectType Observable + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index: number) => of(Boolean(v)), false)); // $ExpectType Observable }); it('should enforce types', () => { diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index 6770751a25..8342290601 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -28,10 +28,10 @@ describe('switchScan', () => { switchScan((acc, x) => { seeds.push(acc); return of(10).pipe(map(v => v * x + acc)); - }), + }, 0), ).subscribe(); - expect(seeds).to.deep.equal([1, 31]); + expect(seeds).to.deep.equal([0, 10, 40]); }); it('should unsub inner observables', () => { @@ -328,7 +328,7 @@ describe('switchScan', () => { const e1subs = '(^!)'; const expected = '|'; - const result = e1.pipe(switchScan((acc, value) => of(value))); + const result = e1.pipe(switchScan((acc, value) => of(value), '')); expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -339,7 +339,7 @@ describe('switchScan', () => { const e1subs = '^'; const expected = '-'; - const result = e1.pipe(switchScan((acc, value) => of(value))); + const result = e1.pipe(switchScan((acc, value) => of(value), '')); expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -350,7 +350,7 @@ describe('switchScan', () => { const e1subs = '(^!)'; const expected = '#'; - const result = e1.pipe(switchScan((acc, value) => of(value))); + const result = e1.pipe(switchScan((acc, value) => of(value), '')); expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -377,13 +377,13 @@ describe('switchScan', () => { const observer = (value: string) => seeds.push(value); const source = of('a', 'b').pipe( - switchScan((acc, x) => of(x + 'x')) + switchScan((acc, x) => of(acc + x), '') ); source.subscribe(observer); source.subscribe(observer); - expect(seeds).to.deep.equal(['a', 'bx', 'a', 'bx']); + expect(seeds).to.deep.equal(['a', 'ab', 'a', 'ab']); }); it('should pass index to the accumulator function', () => { @@ -393,7 +393,7 @@ describe('switchScan', () => { switchScan((acc, x, index) => { indices.push(index); return of(); - }, null), + }, ''), ).subscribe(); expect(indices).to.deep.equal([0, 1, 2, 3]); diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index 3c73d072f3..9d4215cecf 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -1,14 +1,10 @@ import { Observable } from '../Observable'; -import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction } from '../types'; +import { ObservableInput, OperatorFunction } from '../types'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; -import { of } from '../observable/of'; import { switchMap } from './switchMap'; import { tap } from './tap'; -export function switchScan(accumulator: (acc: T, value: T, index: number) => ObservableInput, seed?: T): MonoTypeOperatorFunction; -export function switchScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed?: R): OperatorFunction; - /** * Applies an accumulator function over the source Observable where the * accumulator function itself returns an Observable, emitting values @@ -31,36 +27,20 @@ export function switchScan(accumulator: (acc: R, value: T, index: number) */ export function switchScan( accumulator: (acc: R, value: T, index: number) => ObservableInput, - seed?: R + seed: R ): OperatorFunction { - let hasSeed = false; - // The same reason as described in `scan` https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/scan.ts#L54-L58 - if (arguments.length >= 2) { - hasSeed = true; - } - - return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed, hasSeed)); + return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed)); } class SwitchScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R, private hasSeed: boolean = false) { } + constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R) { } call(subscriber: Subscriber, source: any): any { let seed: R = this.seed; - let hasSeed: boolean = this.hasSeed; return source.pipe( - switchMap((value: T, index: number): ObservableInput => { - if (hasSeed) { - return this.accumulator(seed, value, index); - } else { - return of(value); - } - }), - tap((value: R) => { - seed = value; - hasSeed = true; - }), + switchMap((value: T, index: number): ObservableInput => this.accumulator(seed, value, index)), + tap((value: R) => seed = value), ).subscribe(subscriber); } } From 21c405e361b71c99922a79f54e3c5f2e9753806a Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sat, 4 May 2019 13:53:48 +0200 Subject: [PATCH 08/12] fix(switchScan): fix typings for union types, remove duplicate dts test --- spec-dtslint/operators/switchScan-spec.ts | 20 ++++++++++---------- spec/operators/switchScan-spec.ts | 6 +++--- src/internal/operators/switchScan.ts | 16 ++++++++-------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/spec-dtslint/operators/switchScan-spec.ts b/spec-dtslint/operators/switchScan-spec.ts index 86196c1bfb..41e03a703e 100644 --- a/spec-dtslint/operators/switchScan-spec.ts +++ b/spec-dtslint/operators/switchScan-spec.ts @@ -9,16 +9,20 @@ it('should infer correctly when using a single type', () => { const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable }); -it('should infer correctly when using a seed', () => { - const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable -}); - it('should infer correctly when using seed of a different type', () => { - const o = of(1, 2, 3).pipe(switchScan((acc: string, v: number) => of(acc + v), '0')); // $ExpectType Observable + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), '0')); // $ExpectType Observable }); it('should support a projector that takes an index', () => { - const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number, index: number) => of(Boolean(v)), false)); // $ExpectType Observable + const o = of(1, 2, 3).pipe(switchScan((acc, v, index) => of(Boolean(v)), false)); // $ExpectType Observable +}); + +it('should support projecting to union types', () => { + const o = of(Math.random()).pipe(switchScan(n => n > 0.5 ? of(123) : of('test'), 0)); // $ExpectType Observable +}); + +it('should use the inferred accumulator return type over the seed type', () => { + const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectType Observable }); it('should enforce types', () => { @@ -29,10 +33,6 @@ it('should enforce the return type to be Observable', () => { const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError }); -it('should enforce seed and the return type from accumulator', () => { - const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectError -}); - it('should enforce seed and accumulator to have the same type', () => { const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError }); diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index 8342290601..ee3ddaa29e 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -8,14 +8,14 @@ declare function asDiagram(arg: string): Function; /** @test {switchScan} */ describe('switchScan', () => { asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)') - ('should map-and-flatten each item to an Observable while passing the seed', () => { + ('should map-and-flatten each item to an Observable while passing the accumulated value', () => { const e1 = hot('--1-----3--5-------|'); const e1subs = '^ !'; const e2 = cold('x-x-x| ', {x: 10}); const expected = '--x-x-x-y-yz-z-z---|'; const values = {x: 10, y: 40, z: 90}; - const result = e1.pipe(switchScan((acc: number, x: string) => e2.pipe(map(i => i * Number(x) + acc)), 0)); + const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0)); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -61,7 +61,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------a--b--c----f---g---h---i--|'; - const observableLookup = { x: x, y: y }; + const observableLookup = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index 9d4215cecf..0683d46ecf 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -1,5 +1,5 @@ import { Observable } from '../Observable'; -import { ObservableInput, OperatorFunction } from '../types'; +import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { switchMap } from './switchMap'; @@ -25,21 +25,21 @@ import { tap } from './tap'; * @method switchScan * @owner Observable */ -export function switchScan( - accumulator: (acc: R, value: T, index: number) => ObservableInput, +export function switchScan>( + accumulator: (acc: R, value: T, index: number) => O, seed: R -): OperatorFunction { +): OperatorFunction> { return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed)); } -class SwitchScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R) { } +class SwitchScanOperator> implements Operator { + constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R) { } - call(subscriber: Subscriber, source: any): any { + call(subscriber: Subscriber, source: any): any { let seed: R = this.seed; return source.pipe( - switchMap((value: T, index: number): ObservableInput => this.accumulator(seed, value, index)), + switchMap((value: T, index: number) => this.accumulator(seed, value, index)), tap((value: R) => seed = value), ).subscribe(subscriber); } From f593fcb3134fe61962b8abc7995f60a3763351e3 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Wed, 8 Apr 2020 10:40:08 +0200 Subject: [PATCH 09/12] test(switchScan): fix types after rebase to master --- spec/operators/switchScan-spec.ts | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index ee3ddaa29e..89956b9857 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -61,7 +61,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------a--b--c----f---g---h---i--|'; - const observableLookup = { x, y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -93,7 +93,7 @@ describe('switchScan', () => { const unsub = ' ! '; const expected = '-----------a--b--c---- '; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -113,7 +113,7 @@ describe('switchScan', () => { const expected = '-----------a--b--c---- '; const unsub = ' ! '; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe( mergeMap(x => of(x)), @@ -161,7 +161,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------a--b--c----f---g---h---i--'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -180,7 +180,7 @@ describe('switchScan', () => { const e1subs = '^ !'; const expected = '------------f---g---h---i----|'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -199,7 +199,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------a--b--# '; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -218,7 +218,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------c--d--e----f---g---h---i--|'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -237,7 +237,7 @@ describe('switchScan', () => { const e1subs = '^ !'; const expected = '-----------------------------|'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -256,7 +256,7 @@ describe('switchScan', () => { const e1subs = '^ !'; const expected = '------------------------------'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -275,7 +275,7 @@ describe('switchScan', () => { const e1subs = '^ !'; const expected = '-----------------------------|'; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -287,14 +287,14 @@ describe('switchScan', () => { it('should switch inner never and throw', () => { const x = cold('-'); - const y = cold('#', null, 'sad'); + const y = cold('#', undefined, 'sad'); const xsubs = ' ^ ! '; const ysubs = ' (^!) '; const e1 = hot('---------x---------y---------|'); const e1subs = '^ ! '; const expected = '-------------------# '; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -306,14 +306,14 @@ describe('switchScan', () => { it('should switch inner empty and throw', () => { const x = cold('|'); - const y = cold('#', null, 'sad'); + const y = cold('#', undefined, 'sad'); const xsubs = ' (^!) '; const ysubs = ' (^!) '; const e1 = hot('---------x---------y---------|'); const e1subs = '^ ! '; const expected = '-------------------# '; - const observableLookup = { x: x, y: y }; + const observableLookup: Record> = { x, y }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); @@ -363,7 +363,7 @@ describe('switchScan', () => { const e1subs = '^ ! '; const expected = '-----------a--b--c-# '; - const observableLookup = { x: x }; + const observableLookup: Record> = { x: x }; const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); From 5c788b3b63023d707b17a10804aa95eba4a88f07 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 2 Oct 2020 15:41:34 -0500 Subject: [PATCH 10/12] refactor(switchScan): use `operate` and `switchMap`. --- spec/operators/switchScan-spec.ts | 5 +-- src/internal/operators/switchScan.ts | 47 +++++++++++++++------------- src/operators/index.ts | 2 +- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index 89956b9857..e2d3f5ea0d 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -3,12 +3,9 @@ import { concat, defer, Observable, of } from 'rxjs'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; -declare function asDiagram(arg: string): Function; - /** @test {switchScan} */ describe('switchScan', () => { - asDiagram('switchScan(i => 10*i\u2014\u201410*i\u2014\u201410*i\u2014|, 0)') - ('should map-and-flatten each item to an Observable while passing the accumulated value', () => { + it('should map-and-flatten each item to an Observable while passing the accumulated value', () => { const e1 = hot('--1-----3--5-------|'); const e1subs = '^ !'; const e2 = cold('x-x-x| ', {x: 10}); diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index 0683d46ecf..d2b818ab52 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -1,9 +1,7 @@ -import { Observable } from '../Observable'; +/** @prettier */ import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { switchMap } from './switchMap'; -import { tap } from './tap'; +import { operate } from '../util/lift'; /** * Applies an accumulator function over the source Observable where the @@ -17,30 +15,37 @@ import { tap } from './tap'; * * @see {@link scan} * @see {@link mergeScan} + * @see {@link switchMap} * - * @param {function(acc: R, value: T, index: number): Observable} accumulator + * @param accumulator * The accumulator function called on each source value. - * @param {T|R} [seed] The initial accumulation value. - * @return {Observable} An observable of the accumulated values. - * @method switchScan - * @owner Observable + * @param seed The initial accumulation value. + * @return An observable of the accumulated values. */ export function switchScan>( accumulator: (acc: R, value: T, index: number) => O, seed: R ): OperatorFunction> { - return (source: Observable) => source.lift(new SwitchScanOperator(accumulator, seed)); -} - -class SwitchScanOperator> implements Operator { - constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput, private seed: R) { } + return operate((source, subscriber) => { + // The state we will keep up to date to pass into our + // accumulator function at each new value from the source. + let state: any = seed; - call(subscriber: Subscriber, source: any): any { - let seed: R = this.seed; + // Use `switchMap` on our `source` to do the work of creating + // this operator. Note the backwards order here of `switchMap()(source)` + // to avoid needing to use `pipe` unnecessarily + switchMap( + // On each value from the source, call the accumulator with + // our previous state, the value and the index. + (value: T, index) => accumulator(state, value, index), + // Using the deprecated result selector here as a dirty trick + // to update our state with the flattened value. + (_, innerValue) => ((state = innerValue), innerValue) + )(source).subscribe(subscriber); - return source.pipe( - switchMap((value: T, index: number) => this.accumulator(seed, value, index)), - tap((value: R) => seed = value), - ).subscribe(subscriber); - } + return () => { + // Release state on teardown + state = null!; + }; + }); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 34d069e1d1..d25206ee8b 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -80,9 +80,9 @@ export { skipWhile } from '../internal/operators/skipWhile'; export { startWith } from '../internal/operators/startWith'; export { subscribeOn } from '../internal/operators/subscribeOn'; export { switchAll } from '../internal/operators/switchAll'; -export { switchScan } from '../internal/operators/switchScan'; export { switchMap } from '../internal/operators/switchMap'; export { switchMapTo } from '../internal/operators/switchMapTo'; +export { switchScan } from '../internal/operators/switchScan'; export { take } from '../internal/operators/take'; export { takeLast } from '../internal/operators/takeLast'; export { takeUntil } from '../internal/operators/takeUntil'; From 3e07cf05c7dd1fc3122ae2d09420b272e26161e4 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 2 Oct 2020 15:54:22 -0500 Subject: [PATCH 11/12] chore: update golden files --- api_guard/dist/types/operators/index.d.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 67cbc85ab3..a74050c30e 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -288,6 +288,8 @@ export declare function switchMapTo(observable: ObservableInput): Operator export declare function switchMapTo(observable: ObservableInput, resultSelector: undefined): OperatorFunction; export declare function switchMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; +export declare function switchScan>(accumulator: (acc: R, value: T, index: number) => O, seed: R): OperatorFunction>; + export declare function take(count: number): MonoTypeOperatorFunction; export declare function takeLast(count: number): MonoTypeOperatorFunction; From ad4edb501f7e38e022fae764c70071a4dfa408cb Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Oct 2020 08:18:56 -0500 Subject: [PATCH 12/12] chore: address comments --- spec/operators/switchScan-spec.ts | 12 ++++++------ src/internal/operators/switchScan.ts | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts index e2d3f5ea0d..e1da5a6531 100644 --- a/spec/operators/switchScan-spec.ts +++ b/spec/operators/switchScan-spec.ts @@ -18,17 +18,17 @@ describe('switchScan', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should not pass seed when undefined', () => { - const seeds: number[] = []; + it('should provide the proper acculumated values', () => { + const accs: number[] = []; of(1, 3, 5).pipe( switchScan((acc, x) => { - seeds.push(acc); - return of(10).pipe(map(v => v * x + acc)); - }, 0), + accs.push(acc); + return of(acc + x) + }, 100), ).subscribe(); - expect(seeds).to.deep.equal([0, 10, 40]); + expect(accs).to.deep.equal([100, 101, 104]); }); it('should unsub inner observables', () => { diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts index d2b818ab52..34c0841ef3 100644 --- a/src/internal/operators/switchScan.ts +++ b/src/internal/operators/switchScan.ts @@ -3,6 +3,8 @@ import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types'; import { switchMap } from './switchMap'; import { operate } from '../util/lift'; +// TODO: Generate a marble diagram for these docs. + /** * Applies an accumulator function over the source Observable where the * accumulator function itself returns an Observable, emitting values @@ -11,8 +13,6 @@ import { operate } from '../util/lift'; * It's like {@link scan}, but only the most recent * Observable returned by the accumulator is merged into the outer Observable. * - * ![](switchScan.png) - * * @see {@link scan} * @see {@link mergeScan} * @see {@link switchMap} @@ -29,7 +29,7 @@ export function switchScan>( return operate((source, subscriber) => { // The state we will keep up to date to pass into our // accumulator function at each new value from the source. - let state: any = seed; + let state = seed; // Use `switchMap` on our `source` to do the work of creating // this operator. Note the backwards order here of `switchMap()(source)`