diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 67cbc85ab3..a74050c30e 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -288,6 +288,8 @@ export declare function switchMapTo(observable: ObservableInput): Operator export declare function switchMapTo(observable: ObservableInput, resultSelector: undefined): OperatorFunction; export declare function switchMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; +export declare function switchScan>(accumulator: (acc: R, value: T, index: number) => O, seed: R): OperatorFunction>; + export declare function take(count: number): MonoTypeOperatorFunction; export declare function takeLast(count: number): MonoTypeOperatorFunction; diff --git a/docs_app/content/guide/operators.md b/docs_app/content/guide/operators.md index 7b30e280e7..1d3cf1f1db 100644 --- a/docs_app/content/guide/operators.md +++ b/docs_app/content/guide/operators.md @@ -174,6 +174,7 @@ These are Observable creation operators that also have join functionality -- emi - [`partition`](/api/operators/partition) - [`pluck`](/api/operators/pluck) - [`scan`](/api/operators/scan) +- [`switchScan`](/api/operators/switchScan) - [`switchMap`](/api/operators/switchMap) - [`switchMapTo`](/api/operators/switchMapTo) - [`window`](/api/operators/window) diff --git a/docs_app/tools/decision-tree-generator/src/tree.yml b/docs_app/tools/decision-tree-generator/src/tree.yml index 0025033c45..cadb539983 100644 --- a/docs_app/tools/decision-tree-generator/src/tree.yml +++ b/docs_app/tools/decision-tree-generator/src/tree.yml @@ -115,6 +115,9 @@ - label: and output the computed values as a nested Observable when the source emits a value children: - label: mergeScan + - label: and output the computed values as a nested Observable when the source emits a value while unsubscribing from the previous nested Observable + children: + - label: switchScan - label: I want to wrap its messages with metadata children: - label: that describes each notification (next, error, or complete) diff --git a/spec-dtslint/operators/switchScan-spec.ts b/spec-dtslint/operators/switchScan-spec.ts new file mode 100644 index 0000000000..41e03a703e --- /dev/null +++ b/spec-dtslint/operators/switchScan-spec.ts @@ -0,0 +1,38 @@ +import { of } from 'rxjs'; +import { switchScan } from 'rxjs/operators'; + +it('should infer correctly', () => { + const o = of(1, 2, 3).pipe(switchScan((acc: boolean, v: number) => of(Boolean(v)), false)); // $ExpectType Observable +}); + +it('should infer correctly when using a single type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), 0)); // $ExpectType Observable +}); + +it('should infer correctly when using seed of a different type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v) => of(acc + v), '0')); // $ExpectType Observable +}); + +it('should support a projector that takes an index', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, v, index) => of(Boolean(v)), false)); // $ExpectType Observable +}); + +it('should support projecting to union types', () => { + const o = of(Math.random()).pipe(switchScan(n => n > 0.5 ? of(123) : of('test'), 0)); // $ExpectType Observable +}); + +it('should use the inferred accumulator return type over the seed type', () => { + const o = of(1, 2, 3).pipe(switchScan(p => of(1), [])); // $ExpectType Observable +}); + +it('should enforce types', () => { + const o = of(1, 2, 3).pipe(switchScan()); // $ExpectError +}); + +it('should enforce the return type to be Observable', () => { + const o = of(1, 2, 3).pipe(switchScan(p => p)); // $ExpectError +}); + +it('should enforce seed and accumulator to have the same type', () => { + const o = of(1, 2, 3).pipe(switchScan((acc, p) => of([...acc, p]))); // $ExpectError +}); diff --git a/spec/operators/index-spec.ts b/spec/operators/index-spec.ts index 227b361985..ee89bdb6e9 100644 --- a/spec/operators/index-spec.ts +++ b/spec/operators/index-spec.ts @@ -77,6 +77,7 @@ describe('operators/index', () => { expect(index.skipWhile).to.exist; expect(index.startWith).to.exist; expect(index.switchAll).to.exist; + expect(index.switchScan).to.exist; expect(index.switchMap).to.exist; expect(index.switchMapTo).to.exist; expect(index.take).to.exist; diff --git a/spec/operators/switchScan-spec.ts b/spec/operators/switchScan-spec.ts new file mode 100644 index 0000000000..e1da5a6531 --- /dev/null +++ b/spec/operators/switchScan-spec.ts @@ -0,0 +1,398 @@ +import { expect } from 'chai'; +import { concat, defer, Observable, of } from 'rxjs'; +import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { switchScan, map, mergeMap, takeWhile } from 'rxjs/operators'; + +/** @test {switchScan} */ +describe('switchScan', () => { + it('should map-and-flatten each item to an Observable while passing the accumulated value', () => { + const e1 = hot('--1-----3--5-------|'); + const e1subs = '^ !'; + const e2 = cold('x-x-x| ', {x: 10}); + const expected = '--x-x-x-y-yz-z-z---|'; + const values = {x: 10, y: 40, z: 90}; + + const result = e1.pipe(switchScan((acc, x) => e2.pipe(map(i => i * Number(x) + acc)), 0)); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should provide the proper acculumated values', () => { + const accs: number[] = []; + + of(1, 3, 5).pipe( + switchScan((acc, x) => { + accs.push(acc); + return of(acc + x) + }, 100), + ).subscribe(); + + expect(accs).to.deep.equal([100, 101, 104]); + }); + + it('should unsub inner observables', () => { + const unsubbed: string[] = []; + + of('a', 'b').pipe( + switchScan((acc, x) => + new Observable((subscriber) => { + subscriber.complete(); + return () => { + unsubbed.push(x); + }; + }), + null + ) + ).subscribe(); + + expect(unsubbed).to.deep.equal(['a', 'b']); + }); + + it('should switch inner cold observables', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ !'; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c----f---g---h---i--|'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error when projection throws', () => { + const e1 = hot('-------x-----y---|'); + const e1subs = '^ ! '; + const expected = '-------# '; + function project(): any[] { + throw 'error'; + } + + expectObservable(e1.pipe(switchScan(project, null))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner cold observables, outer is unsubscribed early', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const unsub = ' ! '; + const expected = '-----------a--b--c---- '; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--|'); + const ysubs = ' ^ ! '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c---- '; + const unsub = ' ! '; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe( + mergeMap(x => of(x)), + switchScan((acc, value) => observableLookup[value], null), + mergeMap(x => of(x)), + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = concat( + defer(() => { + sideEffects.push(1); + return of(1); + }), + defer(() => { + sideEffects.push(2); + return of(2); + }), + defer(() => { + sideEffects.push(3); + return of(3); + }) + ); + + of(null).pipe( + switchScan(() => synchronousObservable, null), + takeWhile((x) => x != 2) // unsubscribe at the second side-effect + ).subscribe(() => { /* noop */ }); + + expect(sideEffects).to.deep.equal([1, 2]); + }); + + it('should switch inner cold observables, inner never completes', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--'); + const ysubs = ' ^ '; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c----f---g---h---i--'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle a synchronous switch to the second inner observable', () => { + const x = cold( '--a--b--c--d--e--| '); + const xsubs = ' (^!) '; + const y = cold( '---f---g---h---i--| '); + const ysubs = ' ^ ! '; + const e1 = hot('---------(xy)----------------|'); + const e1subs = '^ !'; + const expected = '------------f---g---h---i----|'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner cold observables, one inner throws', () => { + const x = cold( '--a--b--#--d--e--| '); + const xsubs = ' ^ ! '; + const y = cold( '---f---g---h---i--'); + const ysubs: string[] = []; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------a--b--# '; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner hot observables', () => { + const x = hot('-----a--b--c--d--e--| '); + const xsubs = ' ^ ! '; + const y = hot('--p-o-o-p-------------f---g---h---i--|'); + const ysubs = ' ^ !'; + const e1 = hot('---------x---------y---------| '); + const e1subs = '^ ! '; + const expected = '-----------c--d--e----f---g---h---i--|'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and empty', () => { + const x = cold('|'); + const y = cold('|'); + const xsubs = ' (^!) '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '-----------------------------|'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and never', () => { + const x = cold('|'); + const y = cold('-'); + const xsubs = ' (^!) '; + const ysubs = ' ^ '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '------------------------------'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner never and empty', () => { + const x = cold('-'); + const y = cold('|'); + const xsubs = ' ^ ! '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ !'; + const expected = '-----------------------------|'; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner never and throw', () => { + const x = cold('-'); + const y = cold('#', undefined, 'sad'); + const xsubs = ' ^ ! '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ ! '; + const expected = '-------------------# '; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should switch inner empty and throw', () => { + const x = cold('|'); + const y = cold('#', undefined, 'sad'); + const xsubs = ' (^!) '; + const ysubs = ' (^!) '; + const e1 = hot('---------x---------y---------|'); + const e1subs = '^ ! '; + const expected = '-------------------# '; + + const observableLookup: Record> = { x, y }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected, undefined, 'sad'); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '|'; + + const result = e1.pipe(switchScan((acc, value) => of(value), '')); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer never', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + const result = e1.pipe(switchScan((acc, value) => of(value), '')); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer throw', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + const result = e1.pipe(switchScan((acc, value) => of(value), '')); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should handle outer error', () => { + const x = cold( '--a--b--c--d--e--|'); + const xsubs = ' ^ ! '; + const e1 = hot('---------x---------# '); + const e1subs = '^ ! '; + const expected = '-----------a--b--c-# '; + + const observableLookup: Record> = { x: x }; + + const result = e1.pipe(switchScan((acc, value) => observableLookup[value], null)); + + expectObservable(result).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should create a new seed for each subscriber', () => { + const seeds: string[] = []; + const observer = (value: string) => seeds.push(value); + + const source = of('a', 'b').pipe( + switchScan((acc, x) => of(acc + x), '') + ); + + source.subscribe(observer); + source.subscribe(observer); + + expect(seeds).to.deep.equal(['a', 'ab', 'a', 'ab']); + }); + + it('should pass index to the accumulator function', () => { + const indices: number[] = []; + + of('a', 'b', 'c', 'd').pipe( + switchScan((acc, x, index) => { + indices.push(index); + return of(); + }, ''), + ).subscribe(); + + expect(indices).to.deep.equal([0, 1, 2, 3]); + }); +}); diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index 881f0849c1..e507a758a5 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -33,6 +33,9 @@ import { mergeInternals } from './mergeInternals'; * // ...and so on for each click * ``` * + * @see {@link scan} + * @see {@link switchScan} + * * @param {function(acc: R, value: T): Observable} accumulator * The accumulator function called on each source value. * @param seed The initial accumulation value. diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index f60efeb588..f7bf83f761 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -79,6 +79,7 @@ export function scan(accumulator: (acc: A | S, value: V, index: number) * @see {@link expand} * @see {@link mergeScan} * @see {@link reduce} + * @see {@link switchScan} * * @param accumulator A "reducer function". This will be called for each value after an initial state is * acquired. diff --git a/src/internal/operators/switchScan.ts b/src/internal/operators/switchScan.ts new file mode 100644 index 0000000000..34c0841ef3 --- /dev/null +++ b/src/internal/operators/switchScan.ts @@ -0,0 +1,51 @@ +/** @prettier */ +import { ObservableInput, ObservedValueOf, OperatorFunction } from '../types'; +import { switchMap } from './switchMap'; +import { operate } from '../util/lift'; + +// TODO: Generate a marble diagram for these docs. + +/** + * Applies an accumulator function over the source Observable where the + * accumulator function itself returns an Observable, emitting values + * only from the most recently returned Observable. + * + * It's like {@link scan}, but only the most recent + * Observable returned by the accumulator is merged into the outer Observable. + * + * @see {@link scan} + * @see {@link mergeScan} + * @see {@link switchMap} + * + * @param accumulator + * The accumulator function called on each source value. + * @param seed The initial accumulation value. + * @return An observable of the accumulated values. + */ +export function switchScan>( + accumulator: (acc: R, value: T, index: number) => O, + seed: R +): OperatorFunction> { + return operate((source, subscriber) => { + // The state we will keep up to date to pass into our + // accumulator function at each new value from the source. + let state = seed; + + // Use `switchMap` on our `source` to do the work of creating + // this operator. Note the backwards order here of `switchMap()(source)` + // to avoid needing to use `pipe` unnecessarily + switchMap( + // On each value from the source, call the accumulator with + // our previous state, the value and the index. + (value: T, index) => accumulator(state, value, index), + // Using the deprecated result selector here as a dirty trick + // to update our state with the flattened value. + (_, innerValue) => ((state = innerValue), innerValue) + )(source).subscribe(subscriber); + + return () => { + // Release state on teardown + state = null!; + }; + }); +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 9ae9a424a3..d25206ee8b 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -82,6 +82,7 @@ export { subscribeOn } from '../internal/operators/subscribeOn'; export { switchAll } from '../internal/operators/switchAll'; export { switchMap } from '../internal/operators/switchMap'; export { switchMapTo } from '../internal/operators/switchMapTo'; +export { switchScan } from '../internal/operators/switchScan'; export { take } from '../internal/operators/take'; export { takeLast } from '../internal/operators/takeLast'; export { takeUntil } from '../internal/operators/takeUntil';