Skip to content

Commit

Permalink
feat(operator): Add minimal delay operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt authored and benlesh committed Aug 19, 2015
1 parent 314c93f commit 7851885
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 4 deletions.
18 changes: 18 additions & 0 deletions spec/operators/delay-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.delay()', function () {
it('should delay by 100ms', function (done) {
var time = Date.now();
Observable
.value(42)
.delay(100)
.subscribe(function (x) {
expect(Date.now() - time >= 100).toBe(true);
}, null, function() {
expect(Date.now() - time >= 100).toBe(true);
done();
});
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export default class Observable<T> {
concurrent?: number) => Observable<R>;

expand: (project: (x: T, ix: number) => Observable<any>) => Observable<any>;
delay: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;

switchAll: <R>() => Observable<R>;
switchLatest: <R>(project: ((x: T, ix: number) => Observable<any>),
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ import groupBy from './operators/groupBy';

observableProto.groupBy = groupBy;

import delay from './operators/delay';
observableProto.delay = delay;

export default {
Subject,
Scheduler,
Expand Down
12 changes: 8 additions & 4 deletions src/observables/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ export default class SubscribeOnObservable<T> extends Observable<T> {
return source.subscribe(subscriber);
}

constructor(public source: Observable<T>,
protected delay: number = 0,
protected scheduler: Scheduler = Scheduler.nextTick) {
private delayTime: number;
private scheduler: Scheduler;

constructor(source: Observable<T>, delay: number = 0, scheduler: Scheduler = Scheduler.nextTick) {
super();
this.source = source;
this.delayTime = delay;
this.scheduler = scheduler;
}

_subscribe(subscriber) {

const delay = this.delay;
const delay = this.delayTime;
const source = this.source;
const scheduler = this.scheduler;

Expand Down
103 changes: 103 additions & 0 deletions src/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Scheduler from '../Scheduler';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function delay<T>(delay: number, scheduler: Scheduler = Scheduler.immediate) {
return this.lift(new DelayOperator(delay, scheduler));
}

export class DelayOperator<T, R> extends Operator<T, R> {

delay: number;
scheduler: Scheduler;

constructor(delay: number, scheduler: Scheduler) {
super();
this.delay = delay;
this.scheduler = scheduler;
}

call(observer: Observer<T>): Observer<T> {
return new DelaySubscriber(observer, this.delay, this.scheduler);
}
}

export class DelaySubscriber<T> extends Subscriber<T> {

protected delay: number;
protected queue: Array<any>=[];
protected scheduler: Scheduler;
protected active: boolean = false;
protected errored: boolean = false;

static dispatch(state) {
const source = state.source;
const queue = source.queue;
const scheduler = state.scheduler;
const destination = state.destination;
while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
queue.shift().notification.observe(destination);
}
if (queue.length > 0) {
(<any> this).delay = Math.max(0, queue[0].time - scheduler.now());
(<any> this).schedule(state);
} else {
source.active = false;
}
}

constructor(destination: Observer<T>, delay: number, scheduler: Scheduler) {
super(destination);
this.delay = delay;
this.scheduler = scheduler;
}

_next(x) {
if (this.errored) {
return;
}
const scheduler = this.scheduler;
this.queue.push(new DelayMessage<T>(scheduler.now() + this.delay, Notification.createNext(x)));
if (this.active === false) {
this._schedule(scheduler);
}
}

_error(e) {
const scheduler = this.scheduler;
this.errored = true;
this.queue = [new DelayMessage<T>(scheduler.now() + this.delay, Notification.createError(e))];
if (this.active === false) {
this._schedule(scheduler);
}
}

_complete() {
if (this.errored) {
return;
}
const scheduler = this.scheduler;
this.queue.push(new DelayMessage<T>(scheduler.now() + this.delay, Notification.createComplete()));
if (this.active === false) {
this._schedule(scheduler);
}
}

_schedule(scheduler) {
this.active = true;
this.add(scheduler.schedule(this.delay, {
source: this, destination: this.destination, scheduler: scheduler
}, DelaySubscriber.dispatch));
}
}

class DelayMessage<T> {
time: number;
notification: any;
constructor(time: number, notification: any) {
this.time = time;
this.notification = notification;
}
}

0 comments on commit 7851885

Please sign in to comment.