From 1d735b962075e6295a8553f54246781618c8366a Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 17 Aug 2015 16:21:42 -0700 Subject: [PATCH] feat(operator): add throttle closes #191 --- spec/operators/throttle-spec.js | 19 ++++++++++++ spec/scheduler-spec.js | 16 ++++++++++ src/Observable.ts | 1 + src/Rx.ts | 4 +++ src/operators/throttle.ts | 55 +++++++++++++++++++++++++++++++++ 5 files changed, 95 insertions(+) create mode 100644 spec/operators/throttle-spec.js create mode 100644 src/operators/throttle.ts diff --git a/spec/operators/throttle-spec.js b/spec/operators/throttle-spec.js new file mode 100644 index 0000000000..ce402b1ba9 --- /dev/null +++ b/spec/operators/throttle-spec.js @@ -0,0 +1,19 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Scheduler = Rx.Scheduler; + +describe('Observable.prototype.throttle()', function () { + it('should delay calls by the specified amount', function (done) { + var expected = [3, 4]; + var source = Observable.concat(Observable.value(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ) + .throttle(50) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); +}); \ No newline at end of file diff --git a/spec/scheduler-spec.js b/spec/scheduler-spec.js index 5053c9d5db..fcb073e40e 100644 --- a/spec/scheduler-spec.js +++ b/spec/scheduler-spec.js @@ -17,4 +17,20 @@ describe('Scheduler.immediate', function() { expect(call1).toBe(true); expect(call2).toBe(true); }); + + it('should schedule things in the future too', function (done) { + var called = false; + Scheduler.immediate.schedule(500, null, function () { + called = true; + }); + + setTimeout(function () { + expect(called).toBe(false); + }, 400); + + setTimeout(function() { + expect(called).toBe(true); + done(); + }, 700); + }) }); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 3f302039db..f1d2a5c840 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -147,6 +147,7 @@ export default class Observable { // HACK: this should be Observable>, but the build process didn't like it. :( // this will be fixed when we can move everything to the TypeScript compiler I suspect. materialize: () => Observable; + throttle: (delay: number, scheduler?: Scheduler) => Observable; observeOn: (scheduler: Scheduler, delay?: number) => Observable; subscribeOn: (scheduler: Scheduler, delay?: number) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 574b68ac02..e1a79d4bda 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -162,9 +162,13 @@ import groupBy from './operators/groupBy'; observableProto.groupBy = groupBy; import delay from './operators/delay'; +import throttle from './operators/throttle'; + observableProto.delay = delay; +observableProto.throttle = throttle; export { + Subject, Scheduler, Observable, diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts new file mode 100644 index 0000000000..9981c396eb --- /dev/null +++ b/src/operators/throttle.ts @@ -0,0 +1,55 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function throttle(delay: number, scheduler: Scheduler = Scheduler.nextTick) { + return this.lift(new ThrottleOperator(delay, scheduler)); +} + +export class ThrottleOperator extends Operator { + constructor(private delay:number, private scheduler:Scheduler) { + super(); + } + + call(observer: Observer): Observer { + return new ThrottleSubscriber(observer, this.delay, this.scheduler); + } +} + +export class ThrottleSubscriber extends Subscriber { + private throttled: Subscription; + + constructor(destination:Observer, private delay:number, private scheduler:Scheduler) { + super(destination); + } + + _next(x) { + this.clearThrottle(); + this.add(this.throttled = this.scheduler.schedule(this.delay, { value: x, subscriber: this }, dispatchNext)); + } + + throttledNext(x) { + this.clearThrottle(); + this.destination.next(x); + } + + clearThrottle() { + const throttled = this.throttled; + if (throttled) { + this.remove(throttled); + throttled.unsubscribe(); + this.throttled = null; + } + } +} + +function dispatchNext({ value, subscriber }) { + subscriber.throttledNext(value); +}