Skip to content

Commit

Permalink
fix(scheduling): Fixes bugs in scheduled actions.
Browse files Browse the repository at this point in the history
Fix AsapAction to store its scheduled ID on the scheduler instead of on itself. This ensures that if the first AsapAction is canceled, other AsapActions will still execute.

Ensure both QueueAction and AsapAction extend FutureAction, so if either action is rescheduled with a delay > 0, they'll successfully reschedule with the proper timeout.
  • Loading branch information
trxcllnt authored and benlesh committed Dec 29, 2015
1 parent 981da85 commit e050f01
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 107 deletions.
47 changes: 47 additions & 0 deletions spec/schedulers/AsapScheduler-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, rxTestScheduler, hot, cold */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var asap = Rx.Scheduler.asap;
var Notification = Rx.Notification;

describe('AsapScheduler', function () {
it('should exist', function () {
expect(asap).toBeDefined();
});

it('should schedule an action to happen later', function (done) {
var actionHappened = false;
asap.schedule(function () {
actionHappened = true;
done();
});
if (actionHappened) {
done.fail('Scheduled action happened synchronously');
}
});

it('should execute the rest of the scheduled actions if the first action is canceled', function (done) {
var actionHappened = false;
var firstSubscription = null;
var secondSubscription = null;

firstSubscription = asap.schedule(function () {
actionHappened = true;
if (secondSubscription) {
secondSubscription.unsubscribe();
}
done.fail('The first action should not have executed.');
});

secondSubscription = asap.schedule(function () {
if (!actionHappened) {
done();
}
});

if (actionHappened) {
done.fail('Scheduled action happened synchronously');
} else {
firstSubscription.unsubscribe();
}
});
});
6 changes: 3 additions & 3 deletions src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export interface Scheduler {
now(): number;
schedule<T>(work: (state?: any) => Subscription|void, delay?: number, state?: any): Subscription;
flush(): void;
actions: Action[];
scheduled: boolean;
active: boolean;
}
actions: Action[];
scheduledId: number;
}
54 changes: 23 additions & 31 deletions src/scheduler/AsapAction.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,39 @@
import {Immediate} from '../util/Immediate';
import {QueueAction} from './QueueAction';
import {Action} from './Action';
import {Immediate} from '../util/Immediate';
import {FutureAction} from './FutureAction';

export class AsapAction<T> extends QueueAction<T> {
private id: any;
export class AsapAction<T> extends FutureAction<T> {

schedule(state?: any): Action {
if (this.isUnsubscribed) {
return this;
_schedule(state?: any, delay: number = 0): Action {
if (delay > 0) {
return super._schedule(state, delay);
}

this.delay = delay;
this.state = state;

const scheduler = this.scheduler;

const {scheduler} = this;
scheduler.actions.push(this);

if (!scheduler.scheduled) {
scheduler.scheduled = true;
this.id = Immediate.setImmediate(() => {
this.id = null;
this.scheduler.scheduled = false;
this.scheduler.flush();
if (!scheduler.scheduledId) {
scheduler.scheduledId = Immediate.setImmediate(() => {
scheduler.scheduledId = null;
scheduler.flush();
});
}

return this;
}

unsubscribe(): void {
const id = this.id;
const scheduler = this.scheduler;
_unsubscribe(): void {

super.unsubscribe();
const {scheduler} = this;
const {scheduledId, actions} = scheduler;

if (scheduler.actions.length === 0) {
scheduler.active = false;
scheduler.scheduled = false;
}
super._unsubscribe();

if (id) {
this.id = null;
Immediate.clearImmediate(id);
if (actions.length === 0) {
scheduler.active = false;
if (scheduledId != null) {
scheduler.scheduledId = null;
Immediate.clearImmediate(scheduledId);
}
}
}
}
}
12 changes: 5 additions & 7 deletions src/scheduler/AsapScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import {QueueScheduler} from './QueueScheduler';
import {Subscription} from '../Subscription';
import {Action} from './Action';
import {AsapAction} from './AsapAction';
import {QueueAction} from './QueueAction';
import {Subscription} from '../Subscription';
import {QueueScheduler} from './QueueScheduler';

export class AsapScheduler extends QueueScheduler {
public scheduledId: number = null;
scheduleNow<T>(work: (x?: any) => Subscription, state?: any): Action {
return (this.scheduled ?
new QueueAction(this, work) :
new AsapAction(this, work)).schedule(state);
return new AsapAction(this, work).schedule(state);
}
}
}
60 changes: 41 additions & 19 deletions src/scheduler/FutureAction.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,71 @@
import {Subscription} from '../Subscription';
import {QueueScheduler} from './QueueScheduler';
import {root} from '../util/root';
import {Action} from './Action';
import {QueueAction} from './QueueAction';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';

export class FutureAction<T> extends QueueAction<T> {
export class FutureAction<T> extends Subscription implements Action {

id: any;
delay: number;
public id: any;
public state: any;
public delay: number;

constructor(public scheduler: QueueScheduler,
constructor(public scheduler: Scheduler,
public work: (x?: any) => Subscription | void) {
super(scheduler, work);
super();
}

execute() {
if (this.isUnsubscribed) {
throw new Error('How did did we execute a canceled Action?');
}
this.work(this.state);
}

schedule(state?: any, delay: number = 0): Action {
if (this.isUnsubscribed) {
return this;
}
return this._schedule(state, delay);
}

_schedule(state?: any, delay: number = 0): Action {

this.delay = delay;
this.state = state;
const id = this.id;

if (id != null) {
this.id = undefined;
clearTimeout(id);
root.clearTimeout(id);
}

const scheduler = this.scheduler;

this.id = setTimeout(() => {
this.id = void 0;
this.id = root.setTimeout(() => {
this.id = null;
const {scheduler} = this;
scheduler.actions.push(this);
scheduler.flush();
}, this.delay);
}, delay);

return this;
}

unsubscribe() {
const id = this.id;
_unsubscribe() {

const {id, scheduler} = this;
const {actions} = scheduler;
const index = actions.indexOf(this);

if (id != null) {
this.id = void 0;
clearTimeout(id);
this.id = null;
root.clearTimeout(id);
}
super.unsubscribe();

if (index !== -1) {
actions.splice(index, 1);
}

this.work = null;
this.state = null;
this.scheduler = null;
}
}
47 changes: 7 additions & 40 deletions src/scheduler/QueueAction.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,16 @@
import {Subscription} from '../Subscription';
import {Scheduler} from '../Scheduler';
import {Action} from './Action';
import {FutureAction} from './FutureAction';

export class QueueAction<T> extends Subscription implements Action {

state: any;

constructor(public scheduler: Scheduler,
public work: (x?: any) => Subscription | void) {
super();
}

schedule(state?: any): Action {
if (this.isUnsubscribed) {
return this;
export class QueueAction<T> extends FutureAction<T> {
_schedule(state?: any, delay: number = 0): Action {
if (delay > 0) {
return super._schedule(state, delay);
}

this.delay = delay;
this.state = state;
const scheduler = this.scheduler;
scheduler.actions.push(this);
scheduler.flush();
return this;
}

execute() {
if (this.isUnsubscribed) {
throw new Error('How did did we execute a canceled Action?');
}
this.work(this.state);
}

unsubscribe() {

const scheduler = this.scheduler;
const actions = scheduler.actions;
const index = actions.indexOf(this);

this.work = void 0;
this.state = void 0;
this.scheduler = void 0;

if (index !== -1) {
actions.splice(index, 1);
}

super.unsubscribe();
}
}
}
10 changes: 5 additions & 5 deletions src/scheduler/QueueScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import {FutureAction} from './FutureAction';
import {Action} from './Action';

export class QueueScheduler implements Scheduler {
actions: QueueAction<any>[] = [];
active: boolean = false;
scheduled: boolean = false;
public active: boolean = false;
public actions: QueueAction<any>[] = [];
public scheduledId: number = null;

now() {
return Date.now();
}

flush() {
if (this.active || this.scheduled) {
if (this.active || this.scheduledId) {
return;
}
this.active = true;
Expand All @@ -38,4 +38,4 @@ export class QueueScheduler implements Scheduler {
scheduleLater<T>(work: (x?: any) => Subscription | void, delay: number, state?: any): Action {
return new FutureAction(this, work).schedule(state, delay);
}
}
}
4 changes: 2 additions & 2 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {Action} from './Action';
export class VirtualTimeScheduler implements Scheduler {
actions: Action[] = [];
active: boolean = false;
scheduled: boolean = false;
scheduledId: number = null;
index: number = 0;
sorted: boolean = false;
frame: number = 0;
Expand Down Expand Up @@ -114,4 +114,4 @@ class VirtualAction<T> extends Subscription implements Action {

super.unsubscribe();
}
}
}

0 comments on commit e050f01

Please sign in to comment.