Skip to content

Commit

Permalink
feat(operator): add throttle
Browse files Browse the repository at this point in the history
closes #191
  • Loading branch information
benlesh committed Aug 22, 2015
1 parent 9156d61 commit 1d735b9
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 0 deletions.
19 changes: 19 additions & 0 deletions spec/operators/throttle-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Scheduler = Rx.Scheduler;

describe('Observable.prototype.throttle()', function () {
it('should delay calls by the specified amount', function (done) {
var expected = [3, 4];
var source = Observable.concat(Observable.value(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
)
.throttle(50)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});
});
16 changes: 16 additions & 0 deletions spec/scheduler-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,20 @@ describe('Scheduler.immediate', function() {
expect(call1).toBe(true);
expect(call2).toBe(true);
});

it('should schedule things in the future too', function (done) {
var called = false;
Scheduler.immediate.schedule(500, null, function () {
called = true;
});

setTimeout(function () {
expect(called).toBe(false);
}, 400);

setTimeout(function() {
expect(called).toBe(true);
done();
}, 700);
})
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export default class Observable<T> {
// HACK: this should be Observable<Notification<T>>, but the build process didn't like it. :(
// this will be fixed when we can move everything to the TypeScript compiler I suspect.
materialize: () => Observable<any>;
throttle: (delay: number, scheduler?: Scheduler) => Observable<T>;

observeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;
subscribeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;
Expand Down
4 changes: 4 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,13 @@ import groupBy from './operators/groupBy';
observableProto.groupBy = groupBy;

import delay from './operators/delay';
import throttle from './operators/throttle';

observableProto.delay = delay;
observableProto.throttle = throttle;

export {

Subject,
Scheduler,
Observable,
Expand Down
55 changes: 55 additions & 0 deletions src/operators/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function throttle<T>(delay: number, scheduler: Scheduler = Scheduler.nextTick) {
return this.lift(new ThrottleOperator(delay, scheduler));
}

export class ThrottleOperator<T, R> extends Operator<T, R> {
constructor(private delay:number, private scheduler:Scheduler) {
super();
}

call(observer: Observer<R>): Observer<T> {
return new ThrottleSubscriber(observer, this.delay, this.scheduler);
}
}

export class ThrottleSubscriber<T, R> extends Subscriber<T> {
private throttled: Subscription<any>;

constructor(destination:Observer<T>, private delay:number, private scheduler:Scheduler) {
super(destination);
}

_next(x) {
this.clearThrottle();
this.add(this.throttled = this.scheduler.schedule(this.delay, { value: x, subscriber: this }, dispatchNext));
}

throttledNext(x) {
this.clearThrottle();
this.destination.next(x);
}

clearThrottle() {
const throttled = this.throttled;
if (throttled) {
this.remove(throttled);
throttled.unsubscribe();
this.throttled = null;
}
}
}

function dispatchNext({ value, subscriber }) {
subscriber.throttledNext(value);
}

0 comments on commit 1d735b9

Please sign in to comment.