Skip to content

Commit

Permalink
feat(operator): add timeout and timeoutWith
Browse files Browse the repository at this point in the history
adds two operators timeout and timeoutWith. The former is for sending errors on timeout
the latter is for continuing with an Observable on timeout.

closes #244
  • Loading branch information
benlesh committed Sep 2, 2015
1 parent d86f276 commit bb440ad
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 0 deletions.
44 changes: 44 additions & 0 deletions spec/operators/timeout-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeout', function () {
it('should timeout after a specified delay', function (done) {
Observable.never().timeout(100)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);

it('should timeout after a delay and send the passed error', function (done) {
Observable.never().timeout(100, 'hello')
.subscribe(function () {
throw 'should not next';
}, function (err) {
expect(err).toBe('hello');
done();
}, function () {
throw 'should not complete';
})
});


it('should timeout at a specified Date', function (done) {
var date = new Date(Date.now() + 100);

Observable.never().timeout(date)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);
});
23 changes: 23 additions & 0 deletions spec/operators/timeoutWith-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeoutWith', function () {
it('should timeout after a specified delay then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
Observable.never().timeoutWith(100, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);


it('should timeout at a specified date then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
var date = new Date(Date.now() + 100);
Observable.never().timeoutWith(date, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,6 @@ export default class Observable<T> {
bufferCount: <T>(bufferSize: number, startBufferEvery: number) => Observable<T[]>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
timeout: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
}
4 changes: 4 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ observableProto.retryWhen = retryWhen;
observableProto.repeat = repeat;

import _finally from './operators/finally';
import timeout from './operators/timeout';
import timeoutWith from './operators/timeoutWith';

observableProto.finally = _finally;
observableProto.timeout = timeout;
observableProto.timeoutWith = timeoutWith;

import groupBy from './operators/groupBy';
import window from './operators/window';
Expand Down
39 changes: 39 additions & 0 deletions src/operators/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import isDate from '../util/isDate';

export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = Scheduler.immediate) {
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler));
}

class TimeoutOperator<T, R> implements Operator<T, R> {
constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
}

call(observer: Observer<R>) {
return new TimeoutSubscriber(observer, this.waitFor, this.errorToSend, this.scheduler);
}
}

class TimeoutSubscriber<T> extends Subscriber<T> {
timeoutSubscription: Subscription<any>;

constructor(destination: Observer<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
}

sendTimeoutError() {
this.error(this.errorToSend || new Error('timeout'));
}
}

function dispatchTimeout<T>(state: { subscriber: TimeoutSubscriber<T> }) {
const subscriber = state.subscriber;
subscriber.sendTimeoutError();
}
41 changes: 41 additions & 0 deletions src/operators/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import Observable from '../Observable';
import isDate from '../util/isDate';

export default function timeoutWith(due: number|Date, withObservable: Observable<any>, scheduler: Scheduler = Scheduler.immediate) {
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler));
}

class TimeoutWithOperator<T, R> implements Operator<T, R> {
constructor(private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
}

call(observer: Observer<R>) {
return new TimeoutWithSubscriber(observer, this.waitFor, this.withObservable, this.scheduler);
}
}

class TimeoutWithSubscriber<T> extends Subscriber<T> {
timeoutSubscription: Subscription<any>;

constructor(destination: Observer<T>, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
}

handleTimeout() {
const withObservable = this.withObservable;
this.add(withObservable.subscribe(this));
}
}

function dispatchTimeout<T>(state: { subscriber: TimeoutWithSubscriber<T> }) {
const subscriber = state.subscriber;
subscriber.handleTimeout();
}
3 changes: 3 additions & 0 deletions src/util/isDate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function isDate(value) {
return value instanceof Date && !isNaN(+value);
}

0 comments on commit bb440ad

Please sign in to comment.