From d13dbb4560fa1803f89bd731e0302447cf9f29c8 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 26 Oct 2016 15:33:32 -0700 Subject: [PATCH] fix(bufferCount): will behave as expected when `startBufferEvery` is less than `bufferSize` (#2076) - fixed issue where internal `buffers` store was keeping an additional buffer for no good reason - improved logic and performance around updating internal `buffers` list - adds a test to ensure proper behavior fixes #2062 --- spec/operators/bufferCount-spec.ts | 21 +++++++++++++++++++++ src/operator/bufferCount.ts | 22 +++++++--------------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/spec/operators/bufferCount-spec.ts b/spec/operators/bufferCount-spec.ts index bb5155d254..bdf87ce9df 100644 --- a/spec/operators/bufferCount-spec.ts +++ b/spec/operators/bufferCount-spec.ts @@ -1,4 +1,5 @@ import * as Rx from '../../dist/cjs/Rx'; +import { expect } from 'chai'; declare const {hot, asDiagram, expectObservable, expectSubscriptions}; const Observable = Rx.Observable; @@ -31,6 +32,26 @@ describe('Observable.prototype.bufferCount', () => { expectObservable(e1.bufferCount(2)).toBe(expected, values); }); + it('should buffer properly (issue #2062)', () => { + const item$ = new Rx.Subject(); + const results = []; + item$ + .bufferCount(3, 1) + .subscribe(value => { + results.push(value); + + if (value.join() === '1,2,3') { + item$.next(4); + } + }); + + item$.next(1); + item$.next(2); + item$.next(3); + + expect(results).to.deep.equal([[1, 2, 3], [2, 3, 4]]); + }); + it('should emit partial buffers if source completes before reaching specified buffer count', () => { const e1 = hot('--a--b--c--d--|'); const expected = '--------------(x|)'; diff --git a/src/operator/bufferCount.ts b/src/operator/bufferCount.ts index 8fd73c8298..9e51bd5980 100644 --- a/src/operator/bufferCount.ts +++ b/src/operator/bufferCount.ts @@ -62,7 +62,7 @@ class BufferCountOperator implements Operator { * @extends {Ignored} */ class BufferCountSubscriber extends Subscriber { - private buffers: Array = [[]]; + private buffers: Array = []; private count: number = 0; constructor(destination: Subscriber, private bufferSize: number, private startBufferEvery: number) { @@ -70,30 +70,22 @@ class BufferCountSubscriber extends Subscriber { } protected _next(value: T) { - const count = (this.count += 1); - const destination = this.destination; - const bufferSize = this.bufferSize; - const startBufferEvery = (this.startBufferEvery == null) ? bufferSize : this.startBufferEvery; - const buffers = this.buffers; - const len = buffers.length; - let remove = -1; + const count = this.count++; + const { destination, bufferSize, startBufferEvery, buffers } = this; + const startOn = (startBufferEvery == null) ? bufferSize : startBufferEvery; - if (count % startBufferEvery === 0) { + if (count % startOn === 0) { buffers.push([]); } - for (let i = 0; i < len; i++) { + for (let i = buffers.length; i--; ) { const buffer = buffers[i]; buffer.push(value); if (buffer.length === bufferSize) { - remove = i; + buffers.splice(i, 1); destination.next(buffer); } } - - if (remove !== -1) { - buffers.splice(remove, 1); - } } protected _complete() {