Skip to content

Commit

Permalink
feat(operator): add window operators: window, windowWhen, windowTime,…
Browse files Browse the repository at this point in the history
… windowCount, windowToggle

closes #195
  • Loading branch information
benlesh committed Aug 26, 2015
1 parent d565115 commit 9f5d510
Show file tree
Hide file tree
Showing 14 changed files with 602 additions and 9 deletions.
20 changes: 20 additions & 0 deletions spec/operators/window-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.window', function () {
it('should emit windows that close and reopen', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.window(Observable.interval(320))
.take(3)
.flatMap(function (x) { return x.toArray(); })
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
21 changes: 21 additions & 0 deletions spec/operators/windowCount-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.windowCount', function () {
it('should emit windows at intervals', function (done) {
var expected = [
[0, 1],
[1, 2],
[2, 3],
[3]
];
Observable.range(0, 4)
.windowCount(2, 1)
.take(3)
.flatMap(function (x) { return x.toArray(); })
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
36 changes: 36 additions & 0 deletions spec/operators/windowTime-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.windowTime', function () {
it('should emit windows at intervals', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.windowTime(320)
.take(3)
.flatMap(function (x) { return x.toArray(); })
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);


it('should emit windows that have been created at intervals and close after the specified delay', function (done) {
var expected = [
[0, 1, 2, 3, 4],
[2, 3, 4, 5, 6],
[4, 5, 6, 7, 8]
];
Observable.interval(100)
.windowTime(520, 220)
.take(3)
.flatMap(function (x) { return x.toArray(); })
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
17 changes: 17 additions & 0 deletions spec/operators/windowToggle-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.windowToggle', function () {
it('should emit windows that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) {
Observable.interval(100).take(10)
.windowToggle(Observable.timer(320).mapTo('test'), function (n) {
expect(n).toBe('test');
return Observable.timer(320);
})
.flatMap(function (w) { return w.toArray(); })
.subscribe(function (w) {
expect(w).toEqual([3, 4, 5])
}, null, done);
}, 2000);
});
20 changes: 20 additions & 0 deletions spec/operators/windowWhen-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.windowWhen', function () {
it('should emit windows that close and reopen', function (done) {
var expected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8]
];
Observable.interval(100)
.windowWhen(function () { return Observable.timer(320); })
.take(3)
.flatMap(function (x) { return x.toArray(); })
.subscribe(function (w) {
expect(w).toEqual(expected.shift())
}, null, done);
}, 2000);
});
7 changes: 6 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ export default class Observable<T> {
repeat: <T>(count: number) => Observable<T>;

groupBy: <T, R>(keySelector: (value:T) => string, durationSelector?: (group:GroupSubject<R>) => Observable<any>, elementSelector?: (value:T) => R) => Observable<R>;

window: <T>(closingNotifier: Observable<any>) => Observable<Observable<T>>;
windowWhen: <T>(closingSelector: () => Observable<any>) => Observable<Observable<T>>;
windowToggle: <T, O>(openings: Observable<O>, closingSelector?: (openValue: O) => Observable<any>) => Observable<Observable<T>>
windowTime: <T>(windowTimeSpan: number, windowCreationInterval?: number, scheduler?: Scheduler) => Observable<Observable<T>>;
windowCount: <T>(windowSize: number, startWindowEvery: number) => Observable<Observable<T>>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
}
10 changes: 10 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,18 @@ import _finally from './operators/finally';
observableProto.finally = _finally;

import groupBy from './operators/groupBy';
import window from './operators/window';
import windowWhen from './operators/windowWhen';
import windowToggle from './operators/windowToggle';
import windowTime from './operators/windowTime';
import windowCount from './operators/windowCount';

observableProto.groupBy = groupBy;
observableProto.window = window;
observableProto.windowWhen = windowWhen;
observableProto.windowToggle = windowToggle;
observableProto.windowTime = windowTime;
observableProto.windowCount = windowCount;

import delay from './operators/delay';
import throttle from './operators/throttle';
Expand Down
12 changes: 11 additions & 1 deletion src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export class Action<T> extends Subscription<T> {
}

schedule(state?:any): Action<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
Expand Down Expand Up @@ -103,6 +107,9 @@ export class NextTickAction<T> extends Action<T> {
id: number;

schedule(state?:any): Action<T> {
if (this.isUnsubscribed) {
return this;
}

this.state = state;

Expand Down Expand Up @@ -151,7 +158,10 @@ export class FutureAction<T> extends Action<T> {
}

schedule(state?:any): Action<T> {

if (this.isUnsubscribed) {
return this;
}

this.state = state;

const id = this.id;
Expand Down
65 changes: 65 additions & 0 deletions src/operators/window.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function window<T>(closingNotifier: Observable<any>) : Observable<Observable<T>> {
return this.lift(new WindowOperator(closingNotifier));
}

export class WindowOperator<T, R> implements Operator<T, R> {

constructor(private closingNotifier: Observable<any>) {
}

call(observer: Observer<T>): Observer<T> {
return new WindowSubscriber(observer, this.closingNotifier);
}
}

export class WindowSubscriber<T> extends Subscriber<T> {
private window: Subject<T> = new Subject<T>();

constructor(destination: Observer<T>, private closingNotifier: Observable<any>) {
super(destination);
this.add(closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this)));
this.openWindow();
}

_next(value: T) {
this.window.next(value);
}

_error(err: any) {
this.window.error(err);
this.destination.error(err);
}

_complete() {
this.window.complete();
this.destination.complete();
}

openWindow() {
const prevWindow = this.window;
if (prevWindow) {
prevWindow.complete();
}
this.destination.next(this.window = new Subject<T>());
}
}

export class WindowClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: WindowSubscriber<any>) {
super(null);
}

_next() {
this.parent.openWindow();
}
}
73 changes: 73 additions & 0 deletions src/operators/windowCount.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function windowCount<T>(windowSize: number, startWindowEvery: number = 0) : Observable<Observable<T>> {
return this.lift(new WindowCountOperator(windowSize, startWindowEvery));
}

export class WindowCountOperator<T, R> implements Operator<T, R> {

constructor(private windowSize: number, private startWindowEvery: number) {
}

call(observer: Observer<T>): Observer<T> {
return new WindowCountSubscriber(observer, this.windowSize, this.startWindowEvery);
}
}

export class WindowCountSubscriber<T> extends Subscriber<T> {
private windows: { count: number, window: Subject<T> } [] = [];
private count: number = 0;

constructor(destination: Observer<T>, private windowSize: number, private startWindowEvery: number) {
super(destination);
}

_next(value: T) {
const count = (this.count += 1);
const startWindowEvery = this.startWindowEvery;
const windowSize = this.windowSize;
const windows = this.windows;

if (startWindowEvery && count % this.startWindowEvery === 0) {
let window = new Subject<T>();
windows.push({ count: 0, window });
this.destination.next(window);
}

const len = windows.length;
for (let i = 0; i < len; i++) {
let w = windows[i];
const window = w.window;
window.next(value);
if (windowSize === (w.count += 1)) {
window.complete();
}
}
}

_error(err: any) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.error(err);
}
this.destination.error(err);
}

_complete() {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().window.complete();
}
this.destination.complete();
}
}
Loading

0 comments on commit 9f5d510

Please sign in to comment.