Skip to content

Commit

Permalink
feat(operator): add buffer operators: buffer, bufferWhen, bufferTime,…
Browse files Browse the repository at this point in the history
… bufferCount, and bufferToggle

closes #207
  • Loading branch information
benlesh committed Aug 26, 2015
1 parent 9f5d510 commit 9f8347f
Show file tree
Hide file tree
Showing 13 changed files with 575 additions and 3 deletions.
19 changes: 19 additions & 0 deletions spec/operators/buffer-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.buffer', function () {
it('should emit buffers that close and reopen', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.buffer(Observable.interval(320))
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
20 changes: 20 additions & 0 deletions spec/operators/bufferCount-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.bufferCount', function () {
it('should emit buffers at intervals', function (done) {
var expected = [
[0, 1],
[1, 2],
[2, 3],
[3]
];
Observable.range(0, 4)
.bufferCount(2, 1)
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
34 changes: 34 additions & 0 deletions spec/operators/bufferTime-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.bufferTime', function () {
it('should emit buffers at intervals', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.bufferTime(320)
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);


it('should emit buffers that have been created at intervals and close after the specified delay', function (done) {
var expected = [
[0, 1, 2, 3, 4],
[2, 3, 4, 5, 6],
[4, 5, 6, 7, 8]
];
Observable.interval(100)
.bufferTime(520, 220)
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
16 changes: 16 additions & 0 deletions spec/operators/bufferToggle-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.bufferToggle', function () {
it('should emit buffers that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) {
Observable.interval(100).take(10)
.bufferToggle(Observable.timer(320).mapTo('test'), function (n) {
expect(n).toBe('test');
return Observable.timer(320);
})
.subscribe(function (w) {
expect(w).toEqual([3, 4, 5])
}, null, done);
}, 2000);
});
19 changes: 19 additions & 0 deletions spec/operators/bufferWhen-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.bufferWhen', function () {
it('should emit buffers that close and reopen', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.bufferWhen(function () { return Observable.timer(320); })
.take(3)
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
6 changes: 6 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,11 @@ export default class Observable<T> {
windowTime: <T>(windowTimeSpan: number, windowCreationInterval?: number, scheduler?: Scheduler) => Observable<Observable<T>>;
windowCount: <T>(windowSize: number, startWindowEvery: number) => Observable<Observable<T>>;

buffer: <T>(closingNotifier: Observable<any>) => Observable<T[]>;
bufferWhen: <T>(closingSelector: () => Observable<any>) => Observable<T[]>;
bufferToggle: <T, O>(openings: Observable<O>, closingSelector?: (openValue: O) => Observable<any>) => Observable<T[]>
bufferTime: <T>(bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler) => Observable<T[]>;
bufferCount: <T>(bufferSize: number, startBufferEvery: number) => Observable<T[]>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
}
12 changes: 12 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ observableProto.delay = delay;
observableProto.throttle = throttle;
observableProto.debounce = debounce;

import buffer from './operators/buffer';
import bufferCount from './operators/bufferCount';
import bufferTime from './operators/bufferTime';
import bufferToggle from './operators/bufferToggle';
import bufferWhen from './operators/bufferWhen';

observableProto.buffer = buffer;
observableProto.bufferCount = bufferCount;
observableProto.bufferTime = bufferTime;
observableProto.bufferToggle = bufferToggle;
observableProto.bufferWhen = bufferWhen;

export {

Subject,
Expand Down
6 changes: 3 additions & 3 deletions src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class Action<T> extends Subscription<T> {
super();
}

schedule(state?:any): Action<T> {
schedule(state?: any): Action<T> {
if (this.isUnsubscribed) {
return this;
}
Expand Down Expand Up @@ -106,11 +106,11 @@ export class NextTickAction<T> extends Action<T> {

id: number;

schedule(state?:any): Action<T> {
schedule(state?: any): Action<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;

const scheduler = this.scheduler;
Expand Down
68 changes: 68 additions & 0 deletions src/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function buffer<T>(closingNotifier: Observable<any>): Observable<T[]> {
return this.lift(new BufferOperator(closingNotifier));
}

export class BufferOperator<T, R> implements Operator<T, R> {

constructor(private closingNotifier: Observable<any>) {
}

call(observer: Observer<T>): Observer<T> {
return new BufferSubscriber(observer, this.closingNotifier);
}
}

export class BufferSubscriber<T> extends Subscriber<T> {
buffer: T[] = [];

constructor(destination: Observer<T>, closingNotifier: Observable<any>) {
super(destination);
this.add(closingNotifier.subscribe(new BufferClosingNotifierSubscriber(this)));
}

_next(value: T) {
this.buffer.push(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.flushBuffer();
this.destination.complete();
}

flushBuffer() {
const buffer = this.buffer;
this.buffer = [];
this.destination.next(buffer);
}
}

export class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferSubscriber<any>) {
super(null);
}

_next(value: T) {
this.parent.flushBuffer();
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
// noop
}
}
71 changes: 71 additions & 0 deletions src/operators/bufferCount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function bufferCount<T>(bufferSize: number, startBufferEvery: number = null): Observable<T[]> {
return this.lift(new BufferCountOperator(bufferSize, startBufferEvery));
}

export class BufferCountOperator<T, R> implements Operator<T, R> {

constructor(private bufferSize: number, private startBufferEvery: number) {
}

call(observer: Observer<T>): Observer<T> {
return new BufferCountSubscriber(observer, this.bufferSize, this.startBufferEvery);
}
}

export class BufferCountSubscriber<T> extends Subscriber<T> {
buffers: Array<T[]> = [[]];
count: number = 0;

constructor(destination: Observer<T>, private bufferSize: number, private startBufferEvery: number) {
super(destination);
}

_next(value: T) {
const count = (this.count += 1);
const destination = this.destination;
const bufferSize = this.bufferSize;
const startBufferEvery = this.startBufferEvery;
const buffers = this.buffers;
const len = buffers.length;
let remove = -1;

if (count % startBufferEvery === 0) {
buffers.push([]);
}

for (let i = 0; i < len; i++) {
let buffer = buffers[i];
buffer.push(value);
if (buffer.length === bufferSize) {
remove = i;
this.destination.next(buffer);
}
}

if (remove !== -1) {
buffers.splice(remove, 1);
}
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
const destination = this.destination;
const buffers = this.buffers;
while (buffers.length > 0) {
destination.next(buffers.shift());
}
destination.complete();
}
}
Loading

0 comments on commit 9f8347f

Please sign in to comment.