Skip to content

Commit

Permalink
fix(timeout): Cancels scheduled timeout, if no longer needed
Browse files Browse the repository at this point in the history
  • Loading branch information
jayphelps committed Nov 29, 2016
1 parent f51b8f9 commit 9441def
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';
import { Subscription } from '../Subscription';
import { TimeoutError } from '../util/TimeoutError';

/**
Expand Down Expand Up @@ -46,6 +47,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
private action: Subscription = null;

get previousIndex(): number {
return this._previousIndex;
}
Expand All @@ -66,18 +69,34 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
if (source.previousIndex === currentIndex) {
source.notifyTimeout();
}
}

private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
const currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };

this.cancelTimeout();
this.action = this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState
);
this.add(this.action);

this.index++;
this._previousIndex = currentIndex;
}

private cancelTimeout(): void {
const { action } = this;
if (action !== null) {
this.remove(action);
action.unsubscribe();
this.action = null;
}
}

protected _next(value: T): void {
this.destination.next(value);

Expand Down

0 comments on commit 9441def

Please sign in to comment.