diff --git a/perf/micro/immediate-scheduler/operators/debouncetime.js b/perf/micro/immediate-scheduler/operators/debouncetime.js new file mode 100644 index 0000000000..ab5a4029f9 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/debouncetime.js @@ -0,0 +1,24 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var time = [10, 30, 20, 40, 10]; + + var oldDebounceTimeWithImmediateScheduler = RxOld.Observable.range(0, 5, RxOld.Scheduler.immediate) + .flatMap(function (x) { return RxOld.Observable.of(x, RxOld.Scheduler.immediate).delay(time[x]); }) + .debounce(25); + var newDebounceTimeWithImmediateScheduler = RxNew.Observable.range(0, 5) + .mergeMap(function (x) { return RxNew.Observable.of(x).delay(time[x]); }) + .debounceTime(25); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old debounceTime() with immediate scheduler', function () { + oldDebounceTimeWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new debounceTime() with immediate scheduler', function () { + newDebounceTimeWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; \ No newline at end of file diff --git a/spec/operators/debouncetime-spec.js b/spec/operators/debouncetime-spec.js new file mode 100644 index 0000000000..9fe9415e9e --- /dev/null +++ b/spec/operators/debouncetime-spec.js @@ -0,0 +1,89 @@ +/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.debounceTime()', function () { + it('should delay all element by the specified time', function () { + var e1 = hot('-a--------b------c----|'); + var expected = '------a--------b------(c|)'; + + expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected); + }); + + it('should debounce and delay element by the specified time', function () { + var e1 = hot('-a--(bc)-----------d-------|'); + var expected = '---------c--------------d--|'; + + expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected); + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var expected = '-----|'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should complete when source is empty', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var expected = '-----#'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should raise error when source throws', function () { + var e1 = Observable.throw('error'); + var expected = '#'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should debounce and does not complete when source does not completes', function () { + var e1 = hot('-a--(bc)-----------d-------'); + var expected = '---------c--------------d--'; + + expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected); + }); + + it('should not completes when source does not completes', function () { + var e1 = hot('-'); + var expected = '-'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should not completes when source never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.debounceTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should delay all element until source raises error', function () { + var e1 = hot('-a--------b------c----#'); + var expected = '------a--------b------#'; + + expectObservable(e1.debounceTime(50, rxTestScheduler)).toBe(expected); + }); + + it('should debounce all elements while source emits within given time', function () { + var e1 = hot('--a--b--c--d--e--f--g--h-|'); + var expected = '-------------------------(h|)'; + + expectObservable(e1.debounceTime(40, rxTestScheduler)).toBe(expected); + }); + + it('should debounce all element while source emits within given time until raises error', function () { + var e1 = hot('--a--b--c--d--e--f--g--h-#'); + var expected = '-------------------------#'; + + expectObservable(e1.debounceTime(40, rxTestScheduler)).toBe(expected); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index b9a17a4596..2465a0e190 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -19,7 +19,7 @@ export interface CoreOperators { concatMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; count?: () => Observable; dematerialize?: () => Observable; - debounce?: (dueTime: number, scheduler?: Scheduler) => Observable; + debounceTime?: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty?: (defaultValue: R) => Observable|Observable; delay?: (delay: number, scheduler?: Scheduler) => Observable; distinctUntilChanged?: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 5e30269dae..4acb6f7931 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -114,6 +114,9 @@ observableProto.dematerialize = dematerialize; import debounce from './operators/debounce'; observableProto.debounce = debounce; +import debounceTime from './operators/debounceTime'; +observableProto.debounceTime = debounceTime; + import defaultIfEmpty from './operators/defaultIfEmpty'; observableProto.defaultIfEmpty = defaultIfEmpty; diff --git a/src/Rx.ts b/src/Rx.ts index 0de7c31461..2b0027a53d 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -103,6 +103,9 @@ observableProto.dematerialize = dematerialize; import debounce from './operators/debounce'; observableProto.debounce = debounce; +import debounceTime from './operators/debounceTime'; +observableProto.debounceTime = debounceTime; + import defaultIfEmpty from './operators/defaultIfEmpty'; observableProto.defaultIfEmpty = defaultIfEmpty; diff --git a/src/operators/debounceTime.ts b/src/operators/debounceTime.ts new file mode 100644 index 0000000000..62265b80d0 --- /dev/null +++ b/src/operators/debounceTime.ts @@ -0,0 +1,63 @@ +import Operator from '../Operator'; +import Observable from '../Observable'; +import Subscriber from '../Subscriber'; +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; +import nextTick from '../schedulers/nextTick'; + +export default function debounceTime(dueTime: number, scheduler: Scheduler = nextTick): Observable { + return this.lift(new DebounceTimeOperator(dueTime, scheduler)); +} + +class DebounceTimeOperator implements Operator { + constructor(private dueTime: number, private scheduler: Scheduler) { + } + + call(subscriber: Subscriber): Subscriber { + return new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler); + } +} + +class DebounceTimeSubscriber extends Subscriber { + private debouncedSubscription: Subscription = null; + private lastValue: any = null; + + constructor(destination: Subscriber, + private dueTime: number, + private scheduler: Scheduler) { + super(destination); + } + + _next(value: T) { + this.clearDebounce(); + this.lastValue = value; + this.add(this.debouncedSubscription = this.scheduler.schedule(dispatchNext, this.dueTime, this)); + } + + _complete() { + this.debouncedNext(); + this.destination.complete(); + } + + debouncedNext(): void { + this.clearDebounce(); + if (this.lastValue != null) { + this.destination.next(this.lastValue); + this.lastValue = null; + } + } + + private clearDebounce(): void { + const debouncedSubscription = this.debouncedSubscription; + + if (debouncedSubscription !== null) { + this.remove(debouncedSubscription); + debouncedSubscription.unsubscribe(); + this.debouncedSubscription = null; + } + } +} + +function dispatchNext(subscriber) { + subscriber.debouncedNext(); +} \ No newline at end of file