Skip to content

Commit

Permalink
feat: refactor notification and subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
FUDCo committed Jan 11, 2021
1 parent 5a7dcc4 commit a031034
Show file tree
Hide file tree
Showing 23 changed files with 375 additions and 172 deletions.
55 changes: 22 additions & 33 deletions packages/SwingSet/src/kernel/cleanup.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// import { kdebug } from './kdebug';
import { kdebug } from './kdebug';
import { parseKernelSlot } from './parseKernelSlots';

// XXX temporary flags to control features during development
Expand Down Expand Up @@ -62,50 +62,39 @@ export function deleteCListEntryIfEasy(
vatKeeper.deleteCListEntry(kpid, vpid);
}

export function getKpidsToRetire(
vatID,
vatKeeper,
kernelKeeper,
rootKPID,
rootKernelData,
) {
export function getKpidsToRetire(kernelKeeper, rootKPID, rootKernelData) {
const seen = new Set();
function scanKernelPromise(kpid, kernelData) {
// kdebug(`### scanning ${kpid} ${JSON.stringify(kernelData)}`);
if (vatKeeper.hasCListEntry(kpid)) {
// kdebug(`## adding ${kpid} to scan results`);
seen.add(kpid);
if (kernelData) {
for (const slot of kernelData.slots) {
const { type } = parseKernelSlot(slot);
// kdebug(`## examine ${kpid} slot ${slot}`);
if (type === 'promise') {
if (!seen.has(slot)) {
const kp = kernelKeeper.getKernelPromise(slot);
const { data, state } = kp;
// kdebug(`## state of ${slot} is: ${JSON.stringify(kp)}`);
if (state !== 'unresolved') {
if (data) {
scanKernelPromise(slot, data);
}
} else {
// kdebug(`## ${slot} is still unresolved`);
kdebug(`### scanning ${kpid} ${JSON.stringify(kernelData)}`);
seen.add(kpid);
if (kernelData) {
for (const slot of kernelData.slots) {
const { type } = parseKernelSlot(slot);
kdebug(`## examine ${kpid} slot ${slot}`);
if (type === 'promise') {
if (!seen.has(slot)) {
const kp = kernelKeeper.getKernelPromise(slot);
const { data, state } = kp;
kdebug(`## state of ${slot} is: ${JSON.stringify(kp)}`);
if (state !== 'unresolved') {
if (data) {
scanKernelPromise(slot, data);
}
} else {
// kdebug(`## ${slot} previously seen`);
kdebug(`## ${slot} is still unresolved`);
}
} else {
// kdebug(`## ${slot} is not a promise`);
kdebug(`## ${slot} previously seen`);
}
} else {
kdebug(`## ${slot} is not a promise`);
}
} else {
// kdebug(`## ${kpid} has no data`);
}
} else {
// kdebug(`## ${kpid} has no c-list entry for ${vatID}`);
kdebug(`## ${kpid} has no data`);
}
}

kdebug(`## scanning ${rootKPID}`);
scanKernelPromise(rootKPID, rootKernelData);
return Array.from(seen);
}
38 changes: 25 additions & 13 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Remotable, getInterfaceOf } from '@agoric/marshal';
import { assert } from '@agoric/assert';
import { assert, details } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { assertKnownOptions } from '../assertOptions';
import { makeVatManagerFactory } from './vatManager/factory';
Expand All @@ -16,6 +16,7 @@ import { insistDeviceID, insistVatID } from './id';
import { makeMeterManager } from './metering';
import { makeKernelSyscallHandler, doSend } from './kernelSyscall';
import { makeSlogger, makeDummySlogger } from './slogger';
import { getKpidsToRetire } from './cleanup';

import { makeVatLoader } from './loadVat';
import { makeVatTranslators } from './vatTranslator';
Expand Down Expand Up @@ -470,18 +471,28 @@ export default function buildKernel(
} else {
const p = kernelKeeper.getKernelPromise(kpid);
kernelKeeper.incStat(statNameForNotify(p.state));
const kd = harden(['notify', kpid, p]);
const vd = vat.translators.kernelDeliveryToVatDelivery(kd);
if (vd) {
await deliverAndLogToVat(vatID, kd, vd);

const resolutions = vd[1];
const vatKeeper = kernelKeeper.getVatKeeper(vatID);
for (const vpid of Object.keys(resolutions)) {
const kpidToDelete = vatKeeper.mapVatSlotToKernelSlot(vpid);
vatKeeper.deleteCListEntry(kpidToDelete, vpid);
}
const vatKeeper = kernelKeeper.getVatKeeper(vatID);

assert(p.state !== 'unresolved', details`spurious notification ${kpid}`);
const resolutions = [];
if (!vatKeeper.hasCListEntry(kpid)) {
kdebug(`vat ${vatID} has no c-list entry for ${kpid}`);
kdebug(`skipping notify of ${kpid} because it's already been done`);
return;
}
const targets = getKpidsToRetire(kernelKeeper, kpid, p.data);
if (targets.length === 0) {
kdebug(`no kpids to retire`);
kdebug(`skipping notify of ${kpid} because it's already been done`);
return;
}
for (const toResolve of targets) {
resolutions.push([toResolve, kernelKeeper.getKernelPromise(toResolve)]);
}
const kd = harden(['notify', resolutions]);
const vd = vat.translators.kernelDeliveryToVatDelivery(kd);
vatKeeper.deleteCListEntriesForKernelSlots(targets);
await deliverAndLogToVat(vatID, kd, vd);
}
}

Expand Down Expand Up @@ -589,7 +600,8 @@ export default function buildKernel(
// which is fatal to the vat
ksc = translators.vatSyscallToKernelSyscall(vatSyscallObject);
} catch (vaterr) {
kdebug(`vat ${vatID} terminated: error during translation: ${vaterr}`);
// prettier-ignore
kdebug(`vat ${vatID} terminated: error during translation: ${vaterr} ${JSON.stringify(vatSyscallObject)}`);
const problem = 'clist violation: prepare to die';
setTerminationTrigger(vatID, true, true, makeError(problem));
return harden(['error', problem]);
Expand Down
30 changes: 26 additions & 4 deletions packages/SwingSet/src/kernel/liveSlots.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ function build(syscall, forVatID, cacheSize, vatPowers, vatParameters) {
return valToSlot.get(val);
}

let importedPromises = null;
function beginCollectingPromiseImports() {
importedPromises = new Set();
}
function finishCollectingPromiseImports() {
const result = importedPromises;
importedPromises = null;
return result;
}

function convertSlotToVal(slot, iface = undefined) {
let val = slotToVal.get(slot);
if (val) {
Expand Down Expand Up @@ -285,7 +295,11 @@ function build(syscall, forVatID, cacheSize, vatPowers, vatParameters) {
// but the current Promise API doesn't give us a way to discover
// this, so we must subscribe right away. If we were using Vows or
// some other then-able, we could just hook then() to notify us.
syscall.subscribe(slot);
if (importedPromises) {
importedPromises.add(slot);
} else {
syscall.subscribe(slot);
}
} else if (type === 'device') {
val = makeDeviceNode(slot, iface);
} else {
Expand Down Expand Up @@ -530,13 +544,21 @@ function build(syscall, forVatID, cacheSize, vatPowers, vatParameters) {

function notify(resolutions) {
assert(didRoot);
for (const vpid of Object.keys(resolutions)) {
const vp = resolutions[vpid];
beginCollectingPromiseImports();
for (const resolution of resolutions) {
const [vpid, vp] = resolution;
notifyOnePromise(vpid, vp.rejected, vp.data);
}
for (const vpid of Object.keys(resolutions)) {
for (const resolution of resolutions) {
const [vpid] = resolution;
retirePromiseID(vpid);
}
const imports = finishCollectingPromiseImports();
for (const slot of imports) {
if (slotToVal.get(slot)) {
syscall.subscribe(slot);
}
}
}

// TODO: when we add notifyForward, guard against cycles
Expand Down
8 changes: 8 additions & 0 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ export function makeVatKeeper(
}
}

function deleteCListEntriesForKernelSlots(kernelSlots) {
for (const kernelSlot of kernelSlots) {
const vatSlot = mapKernelSlotToVatSlot(kernelSlot);
deleteCListEntry(kernelSlot, vatSlot);
}
}

/**
* Generator function to return the vat's transcript, one entry at a time.
*/
Expand Down Expand Up @@ -284,6 +291,7 @@ export function makeVatKeeper(
mapKernelSlotToVatSlot,
hasCListEntry,
deleteCListEntry,
deleteCListEntriesForKernelSlots,
getTranscript,
addToTranscript,
vatStats,
Expand Down
13 changes: 8 additions & 5 deletions packages/SwingSet/src/kernel/vatManager/deliver.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,14 @@ export function makeDeliver(tools, dispatch) {
// ['message', target, msg]
// target is vid
// msg is: { method, args (capdata), result }
// ['notify', vpid, resolutions]
// vpid is the id of the primary promise being resolved
// resolutions is an object mapping vpid's to the final promise data,
// rendered in vat form, for both the primary promise and any collateral
// promises it references whose resolution has been newly discovered
// ['notify', resolutions]
// resolutions is an array of pairs: [vpid, resolution]
// vpid is the id of the primary promise being resolved
// resolution is the data for the promise being resolved
// There is an entry in the resolutions array both for the primary promise
// and for any collateral promises it references whose resolution was
// newly discovered at the time the notification delivery was being
// generated
async function deliver(vatDeliverObject) {
const [type, ...args] = vatDeliverObject;
switch (type) {
Expand Down
4 changes: 3 additions & 1 deletion packages/SwingSet/src/kernel/vatManager/syscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ export function createSyscall(transcriptManager) {
// (and we'll be terminated, but the kernel and all other vats will
// continue). Emit enough of an error message to explain the errors
// that are about to ensue on our way down.
throw Error(`syscall suffered error, shutdown commencing`);
throw Error(
`syscall ${vatSyscallObject[0]} suffered error, shutdown commencing`,
);
}
// otherwise vres is ['ok', null] or ['ok', capdata]
transcriptManager.addSyscall(vatSyscallObject, data);
Expand Down
37 changes: 14 additions & 23 deletions packages/SwingSet/src/kernel/vatTranslator.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { insistKernelType, parseKernelSlot } from './parseKernelSlots';
import { insistVatType, parseVatSlot } from '../parseVatSlots';
import { insistCapData } from '../capdata';
import { kdebug, legibilizeMessageArgs, legibilizeValue } from './kdebug';
import { deleteCListEntryIfEasy, getKpidsToRetire } from './cleanup';
import { deleteCListEntryIfEasy } from './cleanup';

/*
* Return a function that converts KernelDelivery objects into VatDelivery
Expand Down Expand Up @@ -77,29 +77,20 @@ function makeTranslateKernelDeliveryToVatDelivery(vatID, kernelKeeper) {
}
}

function translateNotify(kpid, kp) {
assert(kp.state !== 'unresolved', details`spurious notification ${kpid}`);
const resolutions = {};
kdebug(`notify ${kpid} ${JSON.stringify(kp)}`);
const targets = getKpidsToRetire(
vatID,
vatKeeper,
kernelKeeper,
kpid,
kp.data,
);
if (targets.length > 0) {
for (const toResolve of targets) {
const p = kernelKeeper.getKernelPromise(toResolve);
const vpid = mapKernelSlotToVatSlot(toResolve);
resolutions[vpid] = translatePromiseDescriptor(p);
}
const vatDelivery = harden(['notify', resolutions]);
return vatDelivery;
} else {
kdebug(`skipping notify of ${kpid} because it's already been done`);
return null;
function translateNotify(kResolutions) {
const vResolutions = [];
let idx = 0;
for (const resolution of kResolutions) {
const [kpid, p] = resolution;
assert(p.state !== 'unresolved', details`spurious notification ${kpid}`);
const vpid = mapKernelSlotToVatSlot(kpid);
const vres = translatePromiseDescriptor(p);
vResolutions.push([vpid, vres]);
kdebug(`notify ${idx} ${kpid}/${vpid} ${JSON.stringify(vres)}`);
idx += 1;
}
const vatDelivery = harden(['notify', vResolutions]);
return vatDelivery;
}

function kernelDeliveryToVatDelivery(kd) {
Expand Down
6 changes: 4 additions & 2 deletions packages/SwingSet/src/vats/comms/clist-kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export function makeKernel(state, syscall, stateKit) {

// *-LocalForKernel: kernel sending in to comms vat

function provideLocalForKernel(kref) {
function provideLocalForKernel(kref, doNotSubscribeSet) {
const { type, allocatedByVat } = parseVatSlot(kref);
if (type !== 'object' && type !== 'promise') {
// TODO: reject the message rather than crashing weirdly, we
Expand Down Expand Up @@ -125,7 +125,9 @@ export function makeKernel(state, syscall, stateKit) {
// the kernel is telling us about a new promise, so it's the decider
trackUnresolvedPromise(vpid);
changeDeciderToKernel(vpid);
syscall.subscribe(vpid);
if (!doNotSubscribeSet || !doNotSubscribeSet.has(vpid)) {
syscall.subscribe(vpid);
}
}
}
}
Expand Down
23 changes: 14 additions & 9 deletions packages/SwingSet/src/vats/comms/delivery.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,48 @@ export function makeDeliveryKit(state, syscall, transmit, clistKit, stateKit) {
return kernelData;
}

function mapDataFromKernel(kdata) {
function mapDataFromKernel(kdata, doNotSubscribeSet) {
insistCapData(kdata);
const slots = kdata.slots.map(provideLocalForKernel);
const slots = kdata.slots.map(slot =>
provideLocalForKernel(slot, doNotSubscribeSet),
);
return harden({ body: kdata.body, slots });
}

// dispatch.deliver from kernel lands here (with message from local vat to
// remote machine): translate to local, join with handleSend
function sendFromKernel(target, method, kargs, kresult) {
const result = provideLocalForKernelResult(kresult);
const args = mapDataFromKernel(kargs);
const args = mapDataFromKernel(kargs, null);
const localDelivery = harden({ target, method, result, args });
handleSend(localDelivery);
}

function mapResolutionFromKernel(resolution) {
function mapResolutionFromKernel(resolution, doNotSubscribeSet) {
if (resolution.type === 'object') {
const slot = provideLocalForKernel(resolution.slot);
const slot = provideLocalForKernel(resolution.slot, null);
return harden({ ...resolution, slot });
}
if (resolution.type === 'data' || resolution.type === 'reject') {
return harden({
...resolution,
data: mapDataFromKernel(resolution.data),
data: mapDataFromKernel(resolution.data, doNotSubscribeSet),
});
}
throw Error(`unknown resolution type ${resolution.type}`);
}

// dispatch.notifyResolve* from kernel lands here (local vat resolving some
// Promise, we need to notify remove machines): translate to local, join
// Promise, we need to notify remote machines): translate to local, join
// with handleResolution
function resolveFromKernel(vpid, resolution) {
function resolveFromKernel(vpid, resolution, doNotSubscribeSet) {
insistPromiseIsUnresolved(vpid);
insistDeciderIsKernel(vpid);
changeDeciderFromKernelToComms(vpid);
handleResolution(vpid, mapResolutionFromKernel(resolution));
handleResolution(
vpid,
mapResolutionFromKernel(resolution, doNotSubscribeSet),
);
}

// dispatch.deliver with msg from vattp lands here, containing a message
Expand Down
Loading

0 comments on commit a031034

Please sign in to comment.