diff --git a/src/operator/sampleTime.ts b/src/operator/sampleTime.ts index 4137a92ea6..0b4e62874a 100644 --- a/src/operator/sampleTime.ts +++ b/src/operator/sampleTime.ts @@ -1,10 +1,7 @@ import { Observable } from '../Observable'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { IScheduler } from '../Scheduler'; -import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; -import { TeardownLogic } from '../Subscription'; +import { sampleTime as higherOrder } from '../operators/sampleTime'; /** * Emits the most recently emitted value from the source Observable within @@ -43,50 +40,5 @@ import { TeardownLogic } from '../Subscription'; * @owner Observable */ export function sampleTime(this: Observable, period: number, scheduler: IScheduler = async): Observable { - return this.lift(new SampleTimeOperator(period, scheduler)); -} - -class SampleTimeOperator implements Operator { - constructor(private period: number, - private scheduler: IScheduler) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new SampleTimeSubscriber(subscriber, this.period, this.scheduler)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SampleTimeSubscriber extends Subscriber { - lastValue: T; - hasValue: boolean = false; - - constructor(destination: Subscriber, - private period: number, - private scheduler: IScheduler) { - super(destination); - this.add(scheduler.schedule(dispatchNotification, period, { subscriber: this, period })); - } - - protected _next(value: T) { - this.lastValue = value; - this.hasValue = true; - } - - notifyNext() { - if (this.hasValue) { - this.hasValue = false; - this.destination.next(this.lastValue); - } - } -} - -function dispatchNotification(this: Action, state: any) { - let { subscriber, period } = state; - subscriber.notifyNext(); - this.schedule(state, period); + return higherOrder(period, scheduler)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 92f383ab48..3b8ea26498 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -59,6 +59,7 @@ export { retry } from './retry'; export { retryWhen } from './retryWhen'; export { refCount } from './refCount'; export { sample } from './sample'; +export { sampleTime } from './sampleTime'; export { scan } from './scan'; export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; diff --git a/src/operators/sampleTime.ts b/src/operators/sampleTime.ts new file mode 100644 index 0000000000..56046bb71c --- /dev/null +++ b/src/operators/sampleTime.ts @@ -0,0 +1,94 @@ +import { Observable } from '../Observable'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { IScheduler } from '../Scheduler'; +import { Action } from '../scheduler/Action'; +import { async } from '../scheduler/async'; +import { TeardownLogic } from '../Subscription'; + +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Emits the most recently emitted value from the source Observable within + * periodic time intervals. + * + * Samples the source Observable at periodic time + * intervals, emitting what it samples. + * + * + * + * `sampleTime` periodically looks at the source Observable and emits whichever + * value it has most recently emitted since the previous sampling, unless the + * source has not emitted anything since the previous sampling. The sampling + * happens periodically in time every `period` milliseconds (or the time unit + * defined by the optional `scheduler` argument). The sampling starts as soon as + * the output Observable is subscribed. + * + * @example Every second, emit the most recent click at most once + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.sampleTime(1000); + * result.subscribe(x => console.log(x)); + * + * @see {@link auditTime} + * @see {@link debounceTime} + * @see {@link delay} + * @see {@link sample} + * @see {@link throttleTime} + * + * @param {number} period The sampling period expressed in milliseconds or the + * time unit determined internally by the optional `scheduler`. + * @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for + * managing the timers that handle the sampling. + * @return {Observable} An Observable that emits the results of sampling the + * values emitted by the source Observable at the specified time interval. + * @method sampleTime + * @owner Observable + */ +export function sampleTime(period: number, scheduler: IScheduler = async): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new SampleTimeOperator(period, scheduler)); +} + +class SampleTimeOperator implements Operator { + constructor(private period: number, + private scheduler: IScheduler) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new SampleTimeSubscriber(subscriber, this.period, this.scheduler)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SampleTimeSubscriber extends Subscriber { + lastValue: T; + hasValue: boolean = false; + + constructor(destination: Subscriber, + private period: number, + private scheduler: IScheduler) { + super(destination); + this.add(scheduler.schedule(dispatchNotification, period, { subscriber: this, period })); + } + + protected _next(value: T) { + this.lastValue = value; + this.hasValue = true; + } + + notifyNext() { + if (this.hasValue) { + this.hasValue = false; + this.destination.next(this.lastValue); + } + } +} + +function dispatchNotification(this: Action, state: any) { + let { subscriber, period } = state; + subscriber.notifyNext(); + this.schedule(state, period); +}