Skip to content

Commit

Permalink
WIP Move event loop into JavaScript.
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Oct 17, 2018
1 parent 77ffbae commit 702d0fc
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 345 deletions.
93 changes: 74 additions & 19 deletions js/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,86 @@ import * as msg from "gen/msg_generated";
import * as errors from "./errors";
import * as util from "./util";
import { maybePushTrace } from "./trace";
import { promiseErrorExaminer } from "./promise_util";

let nextCmdId = 0;
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

let fireTimers: () => void;
let fireTimers: () => number;
let nTasks = 0; // Number of async tasks pending.
let delay = 0; // Cached return value of fireTimers.

export function setFireTimersCallback(fn: () => void) {
function eventLoopLog(): void {
util.log(`TICK delay ${delay} nTasks ${nTasks}`);
}

function idle(): boolean {
delay = fireTimers();
return delay < 0 && nTasks === 0;
}

export function eventLoop(): boolean {
for (;;) {
if (idle()) {
libdeno.runMicrotasks();
if (idle()) {
break;
}
}
eventLoopLog();
const ui8 = poll(delay);
if (ui8 != null) {
handleAsyncMsgFromRust(ui8);
libdeno.runMicrotasks();
}
}
return promiseErrorExaminer();
}

// delay is in milliseconds.
// delay < 0 hangs forever.
// WARNING: poll is a special op. Messages returned from poll will
// not have the same cmd_id.
// WARNING: poll does not go thru sendSync or sendAsync. It is not a real op.
export function poll(delay: number): null | Uint8Array {
const builder = flatbuffers.createBuilder();
msg.Poll.startPoll(builder);
msg.Poll.addDelay(builder, delay);
const inner = msg.Poll.endPoll(builder);
const innerType = msg.Any.Poll;
const [pollCmdId, resBuf] = sendInternal(
builder,
innerType,
inner,
undefined,
true
);
util.assert(pollCmdId > 0);
return resBuf;
}

export function setFireTimersCallback(fn: () => number) {
fireTimers = fn;
}

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
// If a the buffer is empty, recv() on the native side timed out and we
// did not receive a message.
if (ui8.length) {
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
const bb = new flatbuffers.ByteBuffer(ui8);
const base = msg.Base.getRootAsBase(bb);
const cmdId = base.cmdId();
util.log(
`handleAsyncMsgFromRust cmdId ${cmdId} ${msg.Any[base.innerType()]}`
);
const promise = promiseTable.get(cmdId);
util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
promiseTable.delete(cmdId);
const err = errors.maybeError(base);
if (err != null) {
promise!.reject(err);
} else {
promise!.resolve(base);
}
// Fire timers that have become runnable.
fireTimers();
util.assert(nTasks > 0);
nTasks--;
}

// @internal
Expand All @@ -44,6 +95,8 @@ export function sendAsync(
data?: ArrayBufferView
): Promise<msg.Base> {
maybePushTrace(innerType, false); // add to trace if tracing
util.assert(nTasks >= 0);
nTasks++;
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false);
util.assert(resBuf == null);
const promise = util.createResolvable<msg.Base>();
Expand All @@ -61,6 +114,8 @@ export function sendSync(
maybePushTrace(innerType, true); // add to trace if tracing
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, true);
util.assert(cmdId >= 0);
// WARNING: in the case of poll() cmdId may not be the same in the outgoing
// message.
if (resBuf == null) {
return null;
} else {
Expand Down
6 changes: 4 additions & 2 deletions js/libdeno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ export type PromiseRejectEvent =
| "RejectAfterResolved";

interface Libdeno {
// TODO Remove recv(), it is unused.
recv(cb: MessageCallback): void;

send(control: ArrayBufferView, data?: ArrayBufferView): null | Uint8Array;

print(x: string, isErr?: boolean): void;

runMicrotasks(): void;
exit(code: number): void;

setGlobalErrorHandler: (
handler: (
message: string,
Expand All @@ -35,8 +39,6 @@ interface Libdeno {
) => void
) => void;

setPromiseErrorExaminer: (handler: () => boolean) => void;

mainSource: string;
mainSourceMap: RawSourceMap;
}
Expand Down
14 changes: 10 additions & 4 deletions js/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import * as os from "./os";
import { DenoCompiler } from "./compiler";
import { libdeno } from "./libdeno";
import { args } from "./deno";
import { sendSync, handleAsyncMsgFromRust } from "./dispatch";
import { promiseErrorExaminer, promiseRejectHandler } from "./promise_util";
import { promiseRejectHandler } from "./promise_util";
import { version } from "typescript";
import { eventLoop, sendSync } from "./dispatch";

function sendStart(): msg.StartRes {
const builder = flatbuffers.createBuilder();
Expand Down Expand Up @@ -39,10 +39,9 @@ function onGlobalError(

/* tslint:disable-next-line:no-default-export */
export default function denoMain() {
libdeno.recv(handleAsyncMsgFromRust);
libdeno.recv(_ => assert(false && "recv callback is used. see poll()"));
libdeno.setGlobalErrorHandler(onGlobalError);
libdeno.setPromiseRejectHandler(promiseRejectHandler);
libdeno.setPromiseErrorExaminer(promiseErrorExaminer);
const compiler = DenoCompiler.instance();

// First we send an empty "Start" message to let the privileged side know we
Expand Down Expand Up @@ -94,4 +93,11 @@ export default function denoMain() {

compiler.recompile = startResMsg.recompileFlag();
compiler.run(inputFn, `${cwd}/`);

const r = eventLoop();
if (!r) {
// TODO Remove libdeno.exit() and instead return the error code from
// denoMain(). The trick is to properly pass this value thru deno_execute().
libdeno.exit(1);
}
}
2 changes: 1 addition & 1 deletion js/promise_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export function promiseRejectHandler(
event: PromiseRejectEvent,
/* tslint:disable-next-line:no-any */
promise: Promise<any>
) {
): void {
switch (event) {
case "RejectWithNoHandler":
rejectMap.set(promise, (error as Error).stack || "RejectWithNoHandler");
Expand Down
95 changes: 18 additions & 77 deletions js/timers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
import { assert } from "./util";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { sendSync, setFireTimersCallback } from "./dispatch";
import { setFireTimersCallback } from "./dispatch";

// Tell the dispatcher which function it should call to fire timers that are
// due. This is done using a callback because circular imports are disallowed.
Expand All @@ -14,7 +12,6 @@ interface Timer {
delay: number;
due: number;
repeat: boolean;
scheduled: boolean;
}

// We'll subtract EPOCH every time we retrieve the time with Date.now(). This
Expand All @@ -25,8 +22,6 @@ interface Timer {
const EPOCH = Date.now();
const APOCALYPS = 2 ** 32 - 2;

let globalTimeoutDue: number | null = null;

let nextTimerId = 1;
const idMap = new Map<number, Timer>();
const dueMap: { [due: number]: Timer[] } = Object.create(null);
Expand All @@ -38,67 +33,27 @@ function getTime() {
return now;
}

function setGlobalTimeout(due: number | null, now: number) {
// Since JS and Rust don't use the same clock, pass the time to rust as a
// relative time value. On the Rust side we'll turn that into an absolute
// value again.
// Note that a negative time-out value stops the global timer.
let timeout;
if (due === null) {
timeout = -1;
} else {
timeout = due - now;
assert(timeout >= 0);
}
// Send message to the backend.
const builder = flatbuffers.createBuilder();
msg.SetTimeout.startSetTimeout(builder);
msg.SetTimeout.addTimeout(builder, timeout);
const inner = msg.SetTimeout.endSetTimeout(builder);
const res = sendSync(builder, msg.Any.SetTimeout, inner);
assert(res == null);
// Remember when when the global timer will fire.
globalTimeoutDue = due;
}

function schedule(timer: Timer, now: number) {
assert(!timer.scheduled);
assert(now <= timer.due);
function schedule(timer: Timer) {
// Find or create the list of timers that will fire at point-in-time `due`.
let list = dueMap[timer.due];
if (list === undefined) {
list = dueMap[timer.due] = [];
}
// Append the newly scheduled timer to the list and mark it as scheduled.
list.push(timer);
timer.scheduled = true;
// If the new timer is scheduled to fire before any timer that existed before,
// update the global timeout to reflect this.
if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
setGlobalTimeout(timer.due, now);
}
}

function unschedule(timer: Timer) {
if (!timer.scheduled) {
return;
}
idMap.delete(timer.id);
// Find the list of timers that will fire at point-in-time `due`.
const list = dueMap[timer.due];
if (list == null) {
return;
}
if (list.length === 1) {
// Time timer is the only one in the list. Remove the entire list.
assert(list[0] === timer);
delete dueMap[timer.due];
// If the unscheduled timer was 'next up', find when the next timer that
// still exists is due, and update the global alarm accordingly.
if (timer.due === globalTimeoutDue) {
let nextTimerDue: number | null = null;
for (const key in dueMap) {
nextTimerDue = Number(key);
break;
}
setGlobalTimeout(nextTimerDue, getTime());
}
} else {
// Multiple timers that are due at the same point in time.
// Remove this timer from the list.
Expand All @@ -120,51 +75,39 @@ function fire(timer: Timer) {
idMap.delete(timer.id);
} else {
// Interval timer: compute when timer was supposed to fire next.
// However make sure to never schedule the next interval in the past.
const now = getTime();
timer.due = Math.max(now, timer.due + timer.delay);
schedule(timer, now);
timer.due = timer.due + timer.delay;
schedule(timer);
}
// Call the user callback. Intermediate assignment is to avoid leaking `this`
// to it, while also keeping the stack trace neat when it shows up in there.
const callback = timer.callback;
callback();
}

function fireTimers() {
// Returns negative number if there are no pending timers.
// Returns positive number indicating the number of milliseconds that must
// be waited until the next timer will fire.
function fireTimers(): number {
const now = getTime();
// Bail out if we're not expecting the global timer to fire (yet).
if (globalTimeoutDue === null || now < globalTimeoutDue) {
return;
}
// After firing the timers that are due now, this will hold the due time of
// the first timer that hasn't fired yet.
let nextTimerDue: number | null = null;
// Walk over the keys of the 'due' map. Since dueMap is actually a regular
// object and its keys are numerical and smaller than UINT32_MAX - 2,
// keys are iterated in ascending order.
for (const key in dueMap) {
// Convert the object key (a string) to a number.
const due = Number(key);
// Break out of the loop if the next timer isn't due to fire yet.
if (Number(due) > now) {
nextTimerDue = due;
break;
if (due > now) {
return due - now;
}
// Get the list of timers that have this due time, then drop it.
const list = dueMap[key];
delete dueMap[key];
// Fire all the timers in the list.
for (const timer of list) {
// With the list dropped, the timer is no longer scheduled.
timer.scheduled = false;
// Place the callback on the microtask queue.
Promise.resolve(timer).then(fire);
fire(timer);
}
}
// Update the global alarm to go off when the first-up timer that hasn't fired
// yet is due.
setGlobalTimeout(nextTimerDue, now);
return -1; // Wait forever.
}

function setTimer<Args extends Array<unknown>>(
Expand All @@ -187,13 +130,12 @@ function setTimer<Args extends Array<unknown>>(
args,
delay,
due: now + delay,
repeat,
scheduled: false
repeat
};
// Register the timer's existence in the id-to-timer map.
idMap.set(timer.id, timer);
// Schedule the timer in the due table.
schedule(timer, now);
schedule(timer);
return timer.id;
}

Expand Down Expand Up @@ -224,5 +166,4 @@ export function clearTimer(id: number): void {
}
// Unschedule the timer if it is currently scheduled, and forget about it.
unschedule(timer);
idMap.delete(timer.id);
}
Loading

0 comments on commit 702d0fc

Please sign in to comment.