diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index a5ad8448d5..c6da2d266b 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -209,28 +209,24 @@ describe('Observable.prototype.throttle', () => { }); it('should propagate error thrown from durationSelector function', () => { - const e1 = hot('abcdefabcdabcdefghabca| '); - const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), - cold( '-------| ')]; - const e2subs = ['^ ! ', - ' ^ ! ']; - const expected = 'a-----a---# '; + const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|'); + const s1Subs = '^ !'; + const n1 = cold( '----|'); + const n1Subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + const exp = '---x-----x-----x-----(e#)'; let i = 0; - const result = e1.throttle(() => { - if (i === 2) { - throw 'error'; + const result = s1.throttle(() => { + if (i++ === 3) { + throw new Error('lol'); } - return e2[i++]; + return n1; }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - for (let j = 0; j < e2subs.length; j++) { - expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); - } + expectObservable(result).toBe(exp, undefined, new Error('lol')); + expectSubscriptions(s1.subscriptions).toBe(s1Subs); + expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); it('should complete when source does not emit', () => { @@ -353,13 +349,16 @@ describe('Observable.prototype.throttle', () => { describe('throttle(fn, { leading: true, trailing: true })', () => { asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const e2 = cold( '----| '); - const e2subs = [' ^ ! ', - ' ^ ! ', - ' ^ ! ']; - const expected = '-a---y----b---x-c---x-|'; + const e1 = hot('-a-xy-----b--x--cxxx------|'); + const e1subs = '^ !'; + const e2 = cold( '----| '); + const e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-a---y----b---x---x---x---|'; const result = e1.throttle(() => e2, { leading: true, trailing: true }); @@ -367,23 +366,52 @@ describe('Observable.prototype.throttle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should work for individual values', () => { + const s1 = hot('-^-x------------------|'); + const s1Subs = '^ !'; + const n1 = cold( '------------------------|'); + const n1Subs = [' ^ !']; + const exp = '--x------------------|'; + + const result = s1.throttle(() => n1, { leading: true, trailing: true }); + expectObservable(result).toBe(exp); + expectSubscriptions(s1.subscriptions).toBe(s1Subs); + expectSubscriptions(n1.subscriptions).toBe(n1Subs); + }); }); describe('throttle(fn, { leading: false, trailing: true })', () => { asDiagram('throttle(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const e2 = cold( '----| '); - const e2subs = [' ^ ! ', - ' ^ ! ', - ' ^ ! ']; - const expected = '-----y--------x-----x-|'; + const e1 = hot('-a-xy-----b--x--cxxx------|'); + const e1subs = '^ !'; + const e2 = cold( '----| '); + const e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-a---y----b---x---x---x---|'; - const result = e1.throttle(() => e2, { leading: false, trailing: true }); + const result = e1.throttle(() => e2, { leading: true, trailing: true }); expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should work for individual values', () => { + const s1 = hot('-^-x------------------|'); + const s1Subs = '^ !'; + const n1 = cold( '------------------------|'); + const n1Subs = [' ^ !']; + const exp = '--x------------------|'; + + const result = s1.throttle(() => n1, { leading: true, trailing: true }); + expectObservable(result).toBe(exp); + expectSubscriptions(s1.subscriptions).toBe(s1Subs); + expectSubscriptions(n1.subscriptions).toBe(n1Subs); + }); }); }); diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 4b1d50e95e..077c227dda 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -83,38 +83,47 @@ class ThrottleOperator implements Operator { * @extends {Ignored} */ class ThrottleSubscriber extends OuterSubscriber { - private throttled: Subscription; - private _trailingValue: T; - private _hasTrailingValue = false; + private _throttled: Subscription; + private _sendValue: T; + private _hasValue = false; constructor(protected destination: Subscriber, - private durationSelector: (value: T) => SubscribableOrPromise, + private durationSelector: (value: T) => SubscribableOrPromise, private _leading: boolean, private _trailing: boolean) { super(destination); } protected _next(value: T): void { - if (this.throttled) { - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } - } else { - const duration = this.tryDurationSelector(value); - if (duration) { - this.add(this.throttled = subscribeToResult(this, duration)); - } + this._hasValue = true; + this._sendValue = value; + + if (!this._throttled) { if (this._leading) { - this.destination.next(value); - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } + this.send(); + } else { + this.throttle(value); } } } + private send() { + const { _hasValue, _sendValue } = this; + if (_hasValue) { + this.destination.next(_sendValue); + this.throttle(_sendValue); + } + this._hasValue = false; + this._sendValue = null; + } + + private throttle(value: T): void { + const duration = this.tryDurationSelector(value); + if (duration) { + this.add(this._throttled = subscribeToResult(this, duration)); + } + } + private tryDurationSelector(value: T): SubscribableOrPromise { try { return this.durationSelector(value); @@ -124,37 +133,25 @@ class ThrottleSubscriber extends OuterSubscriber { } } - protected _unsubscribe() { - const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this; - - this._trailingValue = null; - this._hasTrailingValue = false; - - if (throttled) { - this.remove(throttled); - this.throttled = null; - throttled.unsubscribe(); + private throttlingDone() { + const { _throttled, _trailing } = this; + if (_throttled) { + _throttled.unsubscribe(); } - } + this._throttled = null; - private _sendTrailing() { - const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this; - if (throttled && _trailing && _hasTrailingValue) { - destination.next(_trailingValue); - this._trailingValue = null; - this._hasTrailingValue = false; + if (_trailing) { + this.send(); } } notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - this._sendTrailing(); - this._unsubscribe(); + this.throttlingDone(); } notifyComplete(): void { - this._sendTrailing(); - this._unsubscribe(); + this.throttlingDone(); } }