From 26401847966706b0cdbcc719023fc707eee1f3d2 Mon Sep 17 00:00:00 2001 From: Jason Aden Date: Thu, 7 Sep 2017 12:29:38 -0700 Subject: [PATCH] feat(switchMapTo): add higher-order lettable version of switchMapTo --- src/operator/startWith.ts | 22 +----- src/operator/switchMapTo.ts | 87 +-------------------- src/operators/index.ts | 2 + src/operators/startWith.ts | 52 +++++++++++++ src/operators/switchMapTo.ts | 141 +++++++++++++++++++++++++++++++++++ 5 files changed, 199 insertions(+), 105 deletions(-) create mode 100644 src/operators/startWith.ts create mode 100644 src/operators/switchMapTo.ts diff --git a/src/operator/startWith.ts b/src/operator/startWith.ts index 88db23e1ef..04bc2c7140 100644 --- a/src/operator/startWith.ts +++ b/src/operator/startWith.ts @@ -1,10 +1,6 @@ import { IScheduler } from '../Scheduler'; import { Observable } from '../Observable'; -import { ArrayObservable } from '../observable/ArrayObservable'; -import { ScalarObservable } from '../observable/ScalarObservable'; -import { EmptyObservable } from '../observable/EmptyObservable'; -import { concat as concatStatic } from '../observable/concat'; -import { isScheduler } from '../util/isScheduler'; +import { startWith as higherOrder } from '../operators/startWith'; /* tslint:disable:max-line-length */ export function startWith(this: Observable, v1: T, scheduler?: IScheduler): Observable; @@ -31,19 +27,5 @@ export function startWith(this: Observable, ...array: Array(this: Observable, ...array: Array): Observable { - let scheduler = array[array.length - 1]; - if (isScheduler(scheduler)) { - array.pop(); - } else { - scheduler = null; - } - - const len = array.length; - if (len === 1) { - return concatStatic(new ScalarObservable(array[0], scheduler), >this); - } else if (len > 1) { - return concatStatic(new ArrayObservable(array, scheduler), >this); - } else { - return concatStatic(new EmptyObservable(scheduler), >this); - } + return higherOrder(...array)(this); } diff --git a/src/operator/switchMapTo.ts b/src/operator/switchMapTo.ts index 63cc5a2855..220a6da308 100644 --- a/src/operator/switchMapTo.ts +++ b/src/operator/switchMapTo.ts @@ -1,10 +1,5 @@ -import { Operator } from '../Operator'; import { Observable, ObservableInput } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { switchMapTo as higherOrder } from '../operators/switchMapTo'; /* tslint:disable:max-line-length */ export function switchMapTo(this: Observable, observable: ObservableInput): Observable; @@ -58,83 +53,5 @@ export function switchMapTo(this: Observable, innerObservable: Obser innerValue: I, outerIndex: number, innerIndex: number) => R): Observable { - return this.lift(new SwitchMapToOperator(innerObservable, resultSelector)); -} - -class SwitchMapToOperator implements Operator { - constructor(private observable: Observable, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new SwitchMapToSubscriber(subscriber, this.observable, this.resultSelector)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SwitchMapToSubscriber extends OuterSubscriber { - private index: number = 0; - private innerSubscription: Subscription; - - constructor(destination: Subscriber, - private inner: Observable, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - super(destination); - } - - protected _next(value: any) { - const innerSubscription = this.innerSubscription; - if (innerSubscription) { - innerSubscription.unsubscribe(); - } - this.add(this.innerSubscription = subscribeToResult(this, this.inner, value, this.index++)); - } - - protected _complete() { - const {innerSubscription} = this; - if (!innerSubscription || innerSubscription.closed) { - super._complete(); - } - } - - protected _unsubscribe() { - this.innerSubscription = null; - } - - notifyComplete(innerSub: Subscription) { - this.remove(innerSub); - this.innerSubscription = null; - if (this.isStopped) { - super._complete(); - } - } - - notifyNext(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const { resultSelector, destination } = this; - if (resultSelector) { - this.tryResultSelector(outerValue, innerValue, outerIndex, innerIndex); - } else { - destination.next(innerValue); - } - } - - private tryResultSelector(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number): void { - const { resultSelector, destination } = this; - let result: R; - try { - result = resultSelector(outerValue, innerValue, outerIndex, innerIndex); - } catch (err) { - destination.error(err); - return; - } - - destination.next(result); - } + return higherOrder(innerObservable, resultSelector)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 7329da9e08..df149921ff 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -64,9 +64,11 @@ export { skip } from './skip'; export { skipLast } from './skipLast'; export { skipUntil } from './skipUntil'; export { skipWhile } from './skipWhile'; +export { startWith } from './startWith'; export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; +export { switchMapTo } from './switchMapTo'; export { takeLast } from './takeLast'; export { tap } from './tap'; export { timestamp } from './timestamp'; diff --git a/src/operators/startWith.ts b/src/operators/startWith.ts new file mode 100644 index 0000000000..5e82e15c27 --- /dev/null +++ b/src/operators/startWith.ts @@ -0,0 +1,52 @@ +import { IScheduler } from '../Scheduler'; +import { Observable } from '../Observable'; +import { ArrayObservable } from '../observable/ArrayObservable'; +import { ScalarObservable } from '../observable/ScalarObservable'; +import { EmptyObservable } from '../observable/EmptyObservable'; +import { concat as concatStatic } from '../observable/concat'; +import { isScheduler } from '../util/isScheduler'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function startWith(v1: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(v1: T, v2: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(v1: T, v2: T, v3: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(v1: T, v2: T, v3: T, v4: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(v1: T, v2: T, v3: T, v4: T, v5: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function startWith(...array: Array): MonoTypeOperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Returns an Observable that emits the items you specify as arguments before it begins to emit + * items emitted by the source Observable. + * + * + * + * @param {...T} values - Items you want the modified Observable to emit first. + * @param {Scheduler} [scheduler] - A {@link IScheduler} to use for scheduling + * the emissions of the `next` notifications. + * @return {Observable} An Observable that emits the items in the specified Iterable and then emits the items + * emitted by the source Observable. + * @method startWith + * @owner Observable + */ +export function startWith(...array: Array): MonoTypeOperatorFunction { + return (source: Observable) => { + let scheduler = array[array.length - 1]; + if (isScheduler(scheduler)) { + array.pop(); + } else { + scheduler = null; + } + + const len = array.length; + if (len === 1) { + return concatStatic(new ScalarObservable(array[0], scheduler), source); + } else if (len > 1) { + return concatStatic(new ArrayObservable(array, scheduler), source); + } else { + return concatStatic(new EmptyObservable(scheduler), source); + } + }; +} diff --git a/src/operators/switchMapTo.ts b/src/operators/switchMapTo.ts new file mode 100644 index 0000000000..ceb4fdc5dc --- /dev/null +++ b/src/operators/switchMapTo.ts @@ -0,0 +1,141 @@ +import { Operator } from '../Operator'; +import { Observable, ObservableInput } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function switchMapTo(observable: ObservableInput): OperatorFunction; +export function switchMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Projects each source value to the same Observable which is flattened multiple + * times with {@link switch} in the output Observable. + * + * It's like {@link switchMap}, but maps each value + * always to the same inner Observable. + * + * + * + * Maps each source value to the given Observable `innerObservable` regardless + * of the source value, and then flattens those resulting Observables into one + * single Observable, which is the output Observable. The output Observables + * emits values only from the most recently emitted instance of + * `innerObservable`. + * + * @example Rerun an interval Observable on every click event + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.switchMapTo(Rx.Observable.interval(1000)); + * result.subscribe(x => console.log(x)); + * + * @see {@link concatMapTo} + * @see {@link switch} + * @see {@link switchMap} + * @see {@link mergeMapTo} + * + * @param {ObservableInput} innerObservable An Observable to replace each value from + * the source Observable. + * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] + * A function to produce the value on the output Observable based on the values + * and the indices of the source (outer) emission and the inner Observable + * emission. The arguments passed to this function are: + * - `outerValue`: the value that came from the source + * - `innerValue`: the value that came from the projected Observable + * - `outerIndex`: the "index" of the value that came from the source + * - `innerIndex`: the "index" of the value from the projected Observable + * @return {Observable} An Observable that emits items from the given + * `innerObservable` (and optionally transformed through `resultSelector`) every + * time a value is emitted on the source Observable, and taking only the values + * from the most recently projected inner Observable. + * @method switchMapTo + * @owner Observable + */ +export function switchMapTo(innerObservable: Observable, + resultSelector?: (outerValue: T, + innerValue: I, + outerIndex: number, + innerIndex: number) => R): OperatorFunction { + return (source: Observable) => source.lift(new SwitchMapToOperator(innerObservable, resultSelector)); +} + +class SwitchMapToOperator implements Operator { + constructor(private observable: Observable, + private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new SwitchMapToSubscriber(subscriber, this.observable, this.resultSelector)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SwitchMapToSubscriber extends OuterSubscriber { + private index: number = 0; + private innerSubscription: Subscription; + + constructor(destination: Subscriber, + private inner: Observable, + private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { + super(destination); + } + + protected _next(value: any) { + const innerSubscription = this.innerSubscription; + if (innerSubscription) { + innerSubscription.unsubscribe(); + } + this.add(this.innerSubscription = subscribeToResult(this, this.inner, value, this.index++)); + } + + protected _complete() { + const {innerSubscription} = this; + if (!innerSubscription || innerSubscription.closed) { + super._complete(); + } + } + + protected _unsubscribe() { + this.innerSubscription = null; + } + + notifyComplete(innerSub: Subscription) { + this.remove(innerSub); + this.innerSubscription = null; + if (this.isStopped) { + super._complete(); + } + } + + notifyNext(outerValue: T, innerValue: I, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + const { resultSelector, destination } = this; + if (resultSelector) { + this.tryResultSelector(outerValue, innerValue, outerIndex, innerIndex); + } else { + destination.next(innerValue); + } + } + + private tryResultSelector(outerValue: T, innerValue: I, + outerIndex: number, innerIndex: number): void { + const { resultSelector, destination } = this; + let result: R; + try { + result = resultSelector(outerValue, innerValue, outerIndex, innerIndex); + } catch (err) { + destination.error(err); + return; + } + + destination.next(result); + } +}