From 9f8347fe33f81810bbed202fd247f46f6f7b8483 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 24 Aug 2015 10:54:27 -0700 Subject: [PATCH] feat(operator): add buffer operators: buffer, bufferWhen, bufferTime, bufferCount, and bufferToggle closes #207 --- spec/operators/buffer-spec.js | 19 +++++ spec/operators/bufferCount-spec.js | 20 +++++ spec/operators/bufferTime-spec.js | 34 ++++++++ spec/operators/bufferToggle-spec.js | 16 ++++ spec/operators/bufferWhen-spec.js | 19 +++++ src/Observable.ts | 6 ++ src/Rx.ts | 12 +++ src/Scheduler.ts | 6 +- src/operators/buffer.ts | 68 ++++++++++++++++ src/operators/bufferCount.ts | 71 ++++++++++++++++ src/operators/bufferTime.ts | 96 ++++++++++++++++++++++ src/operators/bufferToggle.ts | 120 ++++++++++++++++++++++++++++ src/operators/bufferWhen.ts | 91 +++++++++++++++++++++ 13 files changed, 575 insertions(+), 3 deletions(-) create mode 100644 spec/operators/buffer-spec.js create mode 100644 spec/operators/bufferCount-spec.js create mode 100644 spec/operators/bufferTime-spec.js create mode 100644 spec/operators/bufferToggle-spec.js create mode 100644 spec/operators/bufferWhen-spec.js create mode 100644 src/operators/buffer.ts create mode 100644 src/operators/bufferCount.ts create mode 100644 src/operators/bufferTime.ts create mode 100644 src/operators/bufferToggle.ts create mode 100644 src/operators/bufferWhen.ts diff --git a/spec/operators/buffer-spec.js b/spec/operators/buffer-spec.js new file mode 100644 index 0000000000..dea69d3ab6 --- /dev/null +++ b/spec/operators/buffer-spec.js @@ -0,0 +1,19 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.buffer', function () { + it('should emit buffers that close and reopen', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .buffer(Observable.interval(320)) + .take(3) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/bufferCount-spec.js b/spec/operators/bufferCount-spec.js new file mode 100644 index 0000000000..baf68719dd --- /dev/null +++ b/spec/operators/bufferCount-spec.js @@ -0,0 +1,20 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.bufferCount', function () { + it('should emit buffers at intervals', function (done) { + var expected = [ + [0, 1], + [1, 2], + [2, 3], + [3] + ]; + Observable.range(0, 4) + .bufferCount(2, 1) + .take(3) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/bufferTime-spec.js b/spec/operators/bufferTime-spec.js new file mode 100644 index 0000000000..b7881a9d02 --- /dev/null +++ b/spec/operators/bufferTime-spec.js @@ -0,0 +1,34 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.bufferTime', function () { + it('should emit buffers at intervals', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .bufferTime(320) + .take(3) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); + + + it('should emit buffers that have been created at intervals and close after the specified delay', function (done) { + var expected = [ + [0, 1, 2, 3, 4], + [2, 3, 4, 5, 6], + [4, 5, 6, 7, 8] + ]; + Observable.interval(100) + .bufferTime(520, 220) + .take(3) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/bufferToggle-spec.js b/spec/operators/bufferToggle-spec.js new file mode 100644 index 0000000000..8c7928e573 --- /dev/null +++ b/spec/operators/bufferToggle-spec.js @@ -0,0 +1,16 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.bufferToggle', function () { + it('should emit buffers that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) { + Observable.interval(100).take(10) + .bufferToggle(Observable.timer(320).mapTo('test'), function (n) { + expect(n).toBe('test'); + return Observable.timer(320); + }) + .subscribe(function (w) { + expect(w).toEqual([3, 4, 5]) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/bufferWhen-spec.js b/spec/operators/bufferWhen-spec.js new file mode 100644 index 0000000000..e42752014a --- /dev/null +++ b/spec/operators/bufferWhen-spec.js @@ -0,0 +1,19 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.bufferWhen', function () { + it('should emit buffers that close and reopen', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .bufferWhen(function () { return Observable.timer(320); }) + .take(3) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 0a17005369..37ea0738ba 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -168,5 +168,11 @@ export default class Observable { windowTime: (windowTimeSpan: number, windowCreationInterval?: number, scheduler?: Scheduler) => Observable>; windowCount: (windowSize: number, startWindowEvery: number) => Observable>; + buffer: (closingNotifier: Observable) => Observable; + bufferWhen: (closingSelector: () => Observable) => Observable; + bufferToggle: (openings: Observable, closingSelector?: (openValue: O) => Observable) => Observable + bufferTime: (bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler) => Observable; + bufferCount: (bufferSize: number, startBufferEvery: number) => Observable; + finally: (ensure: () => void, thisArg?: any) => Observable; } \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index d345c717e4..0ea4ac172a 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -181,6 +181,18 @@ observableProto.delay = delay; observableProto.throttle = throttle; observableProto.debounce = debounce; +import buffer from './operators/buffer'; +import bufferCount from './operators/bufferCount'; +import bufferTime from './operators/bufferTime'; +import bufferToggle from './operators/bufferToggle'; +import bufferWhen from './operators/bufferWhen'; + +observableProto.buffer = buffer; +observableProto.bufferCount = bufferCount; +observableProto.bufferTime = bufferTime; +observableProto.bufferToggle = bufferToggle; +observableProto.bufferWhen = bufferWhen; + export { Subject, diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 87bd396711..8f6de9dc13 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -65,7 +65,7 @@ export class Action extends Subscription { super(); } - schedule(state?:any): Action { + schedule(state?: any): Action { if (this.isUnsubscribed) { return this; } @@ -106,11 +106,11 @@ export class NextTickAction extends Action { id: number; - schedule(state?:any): Action { + schedule(state?: any): Action { if (this.isUnsubscribed) { return this; } - + this.state = state; const scheduler = this.scheduler; diff --git a/src/operators/buffer.ts b/src/operators/buffer.ts new file mode 100644 index 0000000000..256a1181a7 --- /dev/null +++ b/src/operators/buffer.ts @@ -0,0 +1,68 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function buffer(closingNotifier: Observable): Observable { + return this.lift(new BufferOperator(closingNotifier)); +} + +export class BufferOperator implements Operator { + + constructor(private closingNotifier: Observable) { + } + + call(observer: Observer): Observer { + return new BufferSubscriber(observer, this.closingNotifier); + } +} + +export class BufferSubscriber extends Subscriber { + buffer: T[] = []; + + constructor(destination: Observer, closingNotifier: Observable) { + super(destination); + this.add(closingNotifier.subscribe(new BufferClosingNotifierSubscriber(this))); + } + + _next(value: T) { + this.buffer.push(value); + } + + _error(err: any) { + this.destination.error(err); + } + + _complete() { + this.flushBuffer(); + this.destination.complete(); + } + + flushBuffer() { + const buffer = this.buffer; + this.buffer = []; + this.destination.next(buffer); + } +} + +export class BufferClosingNotifierSubscriber extends Subscriber { + constructor(private parent: BufferSubscriber) { + super(null); + } + + _next(value: T) { + this.parent.flushBuffer(); + } + + _error(err: any) { + this.parent.error(err); + } + + _complete() { + // noop + } +} \ No newline at end of file diff --git a/src/operators/bufferCount.ts b/src/operators/bufferCount.ts new file mode 100644 index 0000000000..1dcb720a03 --- /dev/null +++ b/src/operators/bufferCount.ts @@ -0,0 +1,71 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function bufferCount(bufferSize: number, startBufferEvery: number = null): Observable { + return this.lift(new BufferCountOperator(bufferSize, startBufferEvery)); +} + +export class BufferCountOperator implements Operator { + + constructor(private bufferSize: number, private startBufferEvery: number) { + } + + call(observer: Observer): Observer { + return new BufferCountSubscriber(observer, this.bufferSize, this.startBufferEvery); + } +} + +export class BufferCountSubscriber extends Subscriber { + buffers: Array = [[]]; + count: number = 0; + + constructor(destination: Observer, private bufferSize: number, private startBufferEvery: number) { + super(destination); + } + + _next(value: T) { + const count = (this.count += 1); + const destination = this.destination; + const bufferSize = this.bufferSize; + const startBufferEvery = this.startBufferEvery; + const buffers = this.buffers; + const len = buffers.length; + let remove = -1; + + if (count % startBufferEvery === 0) { + buffers.push([]); + } + + for (let i = 0; i < len; i++) { + let buffer = buffers[i]; + buffer.push(value); + if (buffer.length === bufferSize) { + remove = i; + this.destination.next(buffer); + } + } + + if (remove !== -1) { + buffers.splice(remove, 1); + } + } + + _error(err: any) { + this.destination.error(err); + } + + _complete() { + const destination = this.destination; + const buffers = this.buffers; + while (buffers.length > 0) { + destination.next(buffers.shift()); + } + destination.complete(); + } +} \ No newline at end of file diff --git a/src/operators/bufferTime.ts b/src/operators/bufferTime.ts new file mode 100644 index 0000000000..0d5b5ae0d4 --- /dev/null +++ b/src/operators/bufferTime.ts @@ -0,0 +1,96 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subscription from '../Subscription'; +import { default as Scheduler, Action } from '../Scheduler'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function bufferTime(bufferTimeSpan: number, bufferCreationInterval: number = null, scheduler: Scheduler = Scheduler.nextTick) : Observable { + return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, scheduler)); +} + +export class BufferTimeOperator implements Operator { + + constructor(private bufferTimeSpan: number, private bufferCreationInterval: number, private scheduler: Scheduler) { + } + + call(observer: Observer): Observer { + return new BufferTimeSubscriber(observer, this.bufferTimeSpan, this.bufferCreationInterval, this.scheduler); + } +} + +export class BufferTimeSubscriber extends Subscriber { + private buffers: Array = []; + + constructor(destination: Observer, private bufferTimeSpan: number, private bufferCreationInterval: number, private scheduler: Scheduler) { + super(destination); + let buffer = this.openBuffer(); + if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { + this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferClose)); + this.add(scheduler.schedule(bufferCreationInterval, { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler }, dispatchBufferCreation)); + } else { + this.add(scheduler.schedule(bufferTimeSpan, { subscriber: this, buffer }, dispatchBufferTimeSpanOnly)); + } + } + + _next(value: T) { + const buffers = this.buffers; + const len = buffers.length; + for (let i = 0; i < len; i++) { + buffers[i].push(value); + } + } + + _error(err) { + this.buffers.length = 0; + this.destination.error(err); + } + + _complete() { + const buffers = this.buffers; + while (buffers.length > 0) { + this.destination.next(buffers.shift()); + } + this.destination.complete(); + } + + openBuffer(): T[] { + let buffer = []; + this.buffers.push(buffer); + return buffer; + } + + closeBuffer(buffer: T[]) { + this.destination.next(buffer); + const buffers = this.buffers; + buffers.splice(buffers.indexOf(buffer), 1); + } +} + +function dispatchBufferTimeSpanOnly(state) { + const subscriber: BufferTimeSubscriber = state.subscriber; + + const prevBuffer = state.buffer; + if (prevBuffer) { + subscriber.closeBuffer(prevBuffer); + } + + state.buffer = subscriber.openBuffer(); + (this).schedule(state); +} + +function dispatchBufferCreation(state) { + let { bufferTimeSpan, subscriber, scheduler } = state; + let buffer = subscriber.openBuffer(); + var action = >this; + action.add(scheduler.schedule(bufferTimeSpan, { subscriber, buffer }, dispatchBufferClose)); + action.schedule(state); +} + +function dispatchBufferClose({ subscriber, buffer }) { + subscriber.closeBuffer(buffer); +} \ No newline at end of file diff --git a/src/operators/bufferToggle.ts b/src/operators/bufferToggle.ts new file mode 100644 index 0000000000..cf08acaa72 --- /dev/null +++ b/src/operators/bufferToggle.ts @@ -0,0 +1,120 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function bufferToggle(openings: Observable, closingSelector: (openValue: O) => Observable) : Observable { + return this.lift(new BufferToggleOperator(openings, closingSelector)); +} + +export class BufferToggleOperator implements Operator { + + constructor(private openings: Observable, private closingSelector: (openValue: O) => Observable) { + } + + call(observer: Observer): Observer { + return new BufferToggleSubscriber(observer, this.openings, this.closingSelector); + } +} + +export class BufferToggleSubscriber extends Subscriber { + private buffers: Array = []; + private closingNotification: Subscription; + + constructor(destination: Observer, private openings: Observable, private closingSelector: (openValue: O) => Observable) { + super(destination); + this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); + } + + _next(value: T) { + const buffers = this.buffers; + const len = buffers.length; + for (let i = 0; i < len; i++) { + buffers[i].push(value); + } + } + + _error(err: any) { + this.buffers = null; + this.destination.error(err); + } + + _complete() { + const buffers = this.buffers; + while (buffers.length > 0) { + this.destination.next(buffers.shift()); + } + this.destination.complete(); + } + + openBuffer(value: O) { + const closingSelector = this.closingSelector; + const buffers = this.buffers; + + let closingNotifier = tryCatch(closingSelector)(value); + if (closingNotifier === errorObject) { + const err = closingNotifier.e; + this.buffers = null; + this.destination.error(err); + } else { + let buffer = []; + let context = { + buffer, + subscription: null + }; + buffers.push(buffer); + this.add(context.subscription = closingNotifier.subscribe(new BufferClosingNotifierSubscriber(this, context))); + } + } + + closeBuffer(context: { subscription: any, buffer: T[] }) { + const { buffer, subscription } = context; + const buffers = this.buffers; + this.destination.next(buffer); + buffers.splice(buffers.indexOf(buffer), 1); + this.remove(subscription); + subscription.unsubscribe(); + } +} + +export class BufferClosingNotifierSubscriber extends Subscriber { + constructor(private parent: BufferToggleSubscriber, private context: { subscription: any, buffer: T[] }) { + super(null); + } + + _next() { + this.parent.closeBuffer(this.context); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} + +export class BufferToggleOpeningsSubscriber extends Subscriber { + constructor(private parent: BufferToggleSubscriber) { + super(null); + } + + _next(value: T) { + this.parent.openBuffer(value); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} \ No newline at end of file diff --git a/src/operators/bufferWhen.ts b/src/operators/bufferWhen.ts new file mode 100644 index 0000000000..91d1cdace8 --- /dev/null +++ b/src/operators/bufferWhen.ts @@ -0,0 +1,91 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function bufferWhen(closingSelector: () => Observable) : Observable { + return this.lift(new BufferWhenOperator(closingSelector)); +} + +export class BufferWhenOperator implements Operator { + + constructor(private closingSelector: () => Observable) { + } + + call(observer: Observer): Observer { + return new BufferWhenSubscriber(observer, this.closingSelector); + } +} + +export class BufferWhenSubscriber extends Subscriber { + private buffer: T[]; + private closingNotification: Subscription; + + constructor(destination: Observer, private closingSelector: () => Observable) { + super(destination); + this.openBuffer(); + } + + _next(value: T) { + this.buffer.push(value); + } + + _error(err: any) { + this.buffer = null; + this.destination.error(err); + } + + _complete() { + const buffer = this.buffer; + this.destination.next(buffer); + this.buffer = null; + this.destination.complete(); + } + + openBuffer() { + const prevClosingNotification = this.closingNotification; + if (prevClosingNotification) { + this.remove(prevClosingNotification); + prevClosingNotification.unsubscribe(); + } + + const buffer = this.buffer; + if (buffer) { + this.destination.next(buffer); + } + this.buffer = []; + + let closingNotifier = tryCatch(this.closingSelector)(); + if (closingNotifier === errorObject) { + const err = closingNotifier.e; + this.buffer = null; + this.destination.error(err); + } else { + this.add(this.closingNotification = closingNotifier.subscribe(new BufferClosingNotifierSubscriber(this))); + } + } +} + +export class BufferClosingNotifierSubscriber extends Subscriber { + constructor(private parent: BufferWhenSubscriber) { + super(null); + } + + _next() { + this.parent.openBuffer(); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} \ No newline at end of file