From 00ae6aee061ceb408d906aa4193354c3190f0285 Mon Sep 17 00:00:00 2001 From: "Mark S. Miller" Date: Sat, 16 Mar 2024 17:59:20 -0700 Subject: [PATCH] feat(zone): asyncFlow --- .../src/controller/initializeKernel.js | 2 +- .../SwingSet/src/devices/lib/deviceTools.js | 4 +- packages/SwingSet/src/kernel/deviceSlots.js | 2 +- packages/swingset-liveslots/src/liveslots.js | 2 +- .../swingset-liveslots/src/watchedPromises.js | 6 +- .../tools/fakeVirtualSupport.js | 3 + packages/vow/src/index.js | 1 + packages/vow/src/types.js | 8 + packages/vow/src/vow-utils.js | 24 +- packages/wallet/api/src/lib-dehydrate.js | 2 +- packages/zone/package.json | 13 +- packages/zone/src/async-flow/async-flow.js | 256 +++++++++++++ packages/zone/src/async-flow/convert.js | 120 +++++++ packages/zone/src/async-flow/ephemera.js | 34 ++ packages/zone/src/async-flow/equate.js | 121 +++++++ packages/zone/src/async-flow/log-store.js | 139 +++++++ .../zone/src/async-flow/replay-membrane.js | 339 ++++++++++++++++++ packages/zone/src/async-flow/type-guards.js | 48 +++ packages/zone/src/async-flow/types.js | 150 ++++++++ .../zone/src/async-flow/weak-bijection.js | 122 +++++++ .../zone/test/async-flow/test-async-flow.js | 221 ++++++++++++ packages/zone/test/async-flow/test-convert.js | 120 +++++++ packages/zone/test/async-flow/test-equate.js | 121 +++++++ .../zone/test/async-flow/test-log-store.js | 113 ++++++ .../test/async-flow/test-replay-membrane.js | 284 +++++++++++++++ .../test/async-flow/test-weak-bijection.js | 88 +++++ packages/zone/test/prepare-test-env-ava.js | 15 +- packages/zone/zone-helpers.js | 2 + 28 files changed, 2340 insertions(+), 20 deletions(-) create mode 100644 packages/zone/src/async-flow/async-flow.js create mode 100644 packages/zone/src/async-flow/convert.js create mode 100644 packages/zone/src/async-flow/ephemera.js create mode 100644 packages/zone/src/async-flow/equate.js create mode 100644 packages/zone/src/async-flow/log-store.js create mode 100644 packages/zone/src/async-flow/replay-membrane.js create mode 100644 packages/zone/src/async-flow/type-guards.js create mode 100644 packages/zone/src/async-flow/types.js create mode 100644 packages/zone/src/async-flow/weak-bijection.js create mode 100644 packages/zone/test/async-flow/test-async-flow.js create mode 100644 packages/zone/test/async-flow/test-convert.js create mode 100644 packages/zone/test/async-flow/test-equate.js create mode 100644 packages/zone/test/async-flow/test-log-store.js create mode 100644 packages/zone/test/async-flow/test-replay-membrane.js create mode 100644 packages/zone/test/async-flow/test-weak-bijection.js create mode 100644 packages/zone/zone-helpers.js diff --git a/packages/SwingSet/src/controller/initializeKernel.js b/packages/SwingSet/src/controller/initializeKernel.js index dbf4a74cf10..67552acf629 100644 --- a/packages/SwingSet/src/controller/initializeKernel.js +++ b/packages/SwingSet/src/controller/initializeKernel.js @@ -210,7 +210,7 @@ export async function initializeKernel(config, kernelStorage, options = {}) { serializeBodyFormat: 'smallcaps', // TODO Temporary hack. // See https://github.com/Agoric/agoric-sdk/issues/2780 - errorIdNum: 60000, + errorIdNum: 60_000, }); const args = kunser(m.serialize(harden([vatObj0s, deviceObj0s]))); const rootKref = exportRootObject(kernelKeeper, bootstrapVatID); diff --git a/packages/SwingSet/src/devices/lib/deviceTools.js b/packages/SwingSet/src/devices/lib/deviceTools.js index b8ff25f4c42..da3f01c7417 100644 --- a/packages/SwingSet/src/devices/lib/deviceTools.js +++ b/packages/SwingSet/src/devices/lib/deviceTools.js @@ -74,11 +74,11 @@ export function buildSerializationTools(syscall, deviceName) { } const m = makeMarshal(convertValToSlot, convertSlotToVal, { - marshalName: `device:${deviceName}`, + marshalName: `deviceTools:${deviceName}`, serializeBodyFormat: 'smallcaps', // TODO Temporary hack. // See https://github.com/Agoric/agoric-sdk/issues/2780 - errorIdNum: 60000, + errorIdNum: 40_000, }); // for invoke(), these will unserialize the arguments, and serialize the diff --git a/packages/SwingSet/src/kernel/deviceSlots.js b/packages/SwingSet/src/kernel/deviceSlots.js index b055c3a2f8f..e42fa461a25 100644 --- a/packages/SwingSet/src/kernel/deviceSlots.js +++ b/packages/SwingSet/src/kernel/deviceSlots.js @@ -107,7 +107,7 @@ export function makeDeviceSlots( serializeBodyFormat: 'smallcaps', // TODO Temporary hack. // See https://github.com/Agoric/agoric-sdk/issues/2780 - errorIdNum: 50000, + errorIdNum: 50_000, }); function PresenceHandler(importSlot) { diff --git a/packages/swingset-liveslots/src/liveslots.js b/packages/swingset-liveslots/src/liveslots.js index 9dc469f44b8..641a5ee3729 100644 --- a/packages/swingset-liveslots/src/liveslots.js +++ b/packages/swingset-liveslots/src/liveslots.js @@ -564,7 +564,7 @@ function build( serializeBodyFormat: 'smallcaps', // TODO Temporary hack. // See https://github.com/Agoric/agoric-sdk/issues/2780 - errorIdNum: 70000, + errorIdNum: 70_000, marshalSaveError: err => // By sending this to `console.warn`, under cosmic-swingset this is // controlled by the `console` option given to makeLiveSlots. diff --git a/packages/swingset-liveslots/src/watchedPromises.js b/packages/swingset-liveslots/src/watchedPromises.js index c6dcc80979b..9b00fdc5513 100644 --- a/packages/swingset-liveslots/src/watchedPromises.js +++ b/packages/swingset-liveslots/src/watchedPromises.js @@ -6,6 +6,7 @@ import { assert } from '@agoric/assert'; import { initEmpty, M } from '@agoric/store'; import { E } from '@endo/eventual-send'; import { parseVatSlot } from './parseVatSlots.js'; +import { Fail } from '@endo/errors'; /** * @template V @@ -198,7 +199,10 @@ export function makeWatchedPromiseManager({ const watcherVref = convertValToSlot(watcher); assert(watcherVref, 'invalid watcher'); const { virtual, durable } = parseVatSlot(watcherVref); - assert(virtual || durable, 'promise watcher must be a virtual object'); + virtual || + durable || + // separate line so easy to breakpoint on + Fail`promise watcher must be a virtual object`; if (watcher.onFulfilled) { assert.typeof(watcher.onFulfilled, 'function'); } diff --git a/packages/swingset-liveslots/tools/fakeVirtualSupport.js b/packages/swingset-liveslots/tools/fakeVirtualSupport.js index 512fa1e4a0c..26a6ee51627 100644 --- a/packages/swingset-liveslots/tools/fakeVirtualSupport.js +++ b/packages/swingset-liveslots/tools/fakeVirtualSupport.js @@ -242,6 +242,9 @@ export function makeFakeLiveSlotsStuff(options = {}) { const marshal = makeMarshal(convertValToSlot, convertSlotToVal, { serializeBodyFormat: 'smallcaps', + marshalName: 'fakeLiveSlots', + errorIdNum: 80_000, + marshalSaveError: _err => {}, }); function registerEntry(baseRef, val, valIsCohort) { diff --git a/packages/vow/src/index.js b/packages/vow/src/index.js index 775f899f9af..82e8e5d47b9 100644 --- a/packages/vow/src/index.js +++ b/packages/vow/src/index.js @@ -1,6 +1,7 @@ // @ts-check export * from './tools.js'; export { default as makeE } from './E.js'; +export { VowShape } from './vow-utils.js'; // eslint-disable-next-line import/export export * from './types.js'; diff --git a/packages/vow/src/types.js b/packages/vow/src/types.js index 6cc87175577..845a282ea05 100644 --- a/packages/vow/src/types.js +++ b/packages/vow/src/types.js @@ -122,3 +122,11 @@ export {}; * @template [T=any] * @typedef {import('./types.js').ERef>} Specimen */ + +/** + * @typedef {object} VowTools + * // TODO type according to prepareWatch returns + * @property {(specimenP:any, watcher?:any, watcherContext?:any) => Vow} watch + * @property {(specimenP:any, onFulfilled?:any, onRejected?:any) => Promise} when + * @property {() => VowKit} makeVowKit + */ diff --git a/packages/vow/src/vow-utils.js b/packages/vow/src/vow-utils.js index 193519229ef..bffdb98a3c5 100644 --- a/packages/vow/src/vow-utils.js +++ b/packages/vow/src/vow-utils.js @@ -1,12 +1,21 @@ // @ts-check import { E as basicE } from '@endo/eventual-send'; -import { getTag, passStyleOf } from '@endo/pass-style'; - -// TODO: `isPassable` should come from @endo/pass-style -import { isPassable } from '@agoric/base-zone'; +import { isPassable } from '@endo/pass-style'; +import { M, matches } from '@endo/patterns'; export { basicE }; +export const VowShape = M.tagged( + 'Vow', + M.splitRecord({ + vowV0: M.remotable('VowV0'), + }), +); + +export const isVow = specimen => + isPassable(specimen) && matches(specimen, VowShape); +harden(isVow); + /** * A vow is a passable tagged as 'Vow'. Its payload is a record with * API-versioned remotables. payload.vowV0 is the API for the `watch` and @@ -19,11 +28,7 @@ export { basicE }; * @returns {import('./types').VowPayload | undefined} undefined if specimen is not a vow, otherwise the vow's payload. */ export const getVowPayload = specimen => { - const isVow = - isPassable(specimen) && - passStyleOf(specimen) === 'tagged' && - getTag(specimen) === 'Vow'; - if (!isVow) { + if (!isVow(specimen)) { return undefined; } @@ -32,3 +37,4 @@ export const getVowPayload = specimen => { ); return vow.payload; }; +harden(getVowPayload); diff --git a/packages/wallet/api/src/lib-dehydrate.js b/packages/wallet/api/src/lib-dehydrate.js index 04ad6e538e9..55d0209fe6b 100644 --- a/packages/wallet/api/src/lib-dehydrate.js +++ b/packages/wallet/api/src/lib-dehydrate.js @@ -340,7 +340,7 @@ export const makeDehydrator = (initialUnnamedCount = 0) => { marshalName: 'hydration', // TODO Temporary hack. // See https://github.com/Agoric/agoric-sdk/issues/2780 - errorIdNum: 30000, + errorIdNum: 30_000, serializeBodyFormat: 'smallcaps', }, ); diff --git a/packages/zone/package.json b/packages/zone/package.json index e188a1d53e0..45f7eaeb1ac 100644 --- a/packages/zone/package.json +++ b/packages/zone/package.json @@ -19,6 +19,7 @@ }, "exports": { ".": "./src/index.js", + "./zone-helpers.js": "./zone-helpers.js", "./durable.js": "./durable.js", "./heap.js": "./heap.js", "./virtual.js": "./virtual.js" @@ -28,13 +29,23 @@ "license": "Apache-2.0", "dependencies": { "@agoric/base-zone": "^0.1.0", + "@agoric/store": "^0.9.2", + "@agoric/vow": "^0.1.0", "@agoric/vat-data": "^0.5.2", + "@endo/common": "^1.1.0", + "@endo/errors": "^1.1.0", + "@endo/eventual-send": "^1.1.2", "@endo/far": "^1.0.4", - "@endo/pass-style": "^1.2.0" + "@endo/marshal": "^1.3.0", + "@endo/pass-style": "^1.2.0", + "@endo/patterns": "^1.2.0", + "@endo/promise-kit": "^1.0.4" }, "devDependencies": { "@agoric/swingset-liveslots": "^0.10.2", + "@endo/env-options": "^1.1.1", "@endo/patterns": "^1.2.0", + "@endo/ses-ava": "^1.1.2", "ava": "^5.3.0" }, "publishConfig": { diff --git a/packages/zone/src/async-flow/async-flow.js b/packages/zone/src/async-flow/async-flow.js new file mode 100644 index 00000000000..55e77409299 --- /dev/null +++ b/packages/zone/src/async-flow/async-flow.js @@ -0,0 +1,256 @@ +import { Fail } from '@endo/errors'; +import { E } from '@endo/far'; +import { M } from '@endo/patterns'; +import { PromiseWatcherI } from '@agoric/vow/src/watch-promise.js'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { makeReplayMembrane } from './replay-membrane.js'; +import { prepareLogStore } from './log-store.js'; +import { prepareWeakBijection } from './weak-bijection.js'; +import { makeEphemera } from './ephemera.js'; +import { LogEntryShape } from './type-guards.js'; + +const { defineProperties } = Object; +const { apply } = Reflect; + +const AsyncFlowIKit = harden({ + flow: M.interface('Flow', { + wake: M.call().returns(M.promise()), + getOutcome: M.call().returns(M.any()), + dump: M.call().returns(M.arrayOf(LogEntryShape)), + }), + admin: M.interface('FlowAdmin', { + done: M.call().returns(), + panic: M.call(M.error()).returns(), + }), + wakeWatcher: PromiseWatcherI, +}); + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {string} tag + * @param {GuestAsyncFunc} guestAsyncFunc + * @param {PreparationOptions} [options] + */ +export const prepareAsyncFlowKit = ( + zone, + tag, + guestAsyncFunc, + options = {}, +) => { + const { + vowTools = prepareWatchableVowTools(zone), + makeLogStore = prepareLogStore(zone), + makeWeakBijection = prepareWeakBijection(zone), + } = options; + const { watch, makeVowKit } = vowTools; + + const tmp = makeEphemera(() => ({ + running: false, + optFatalProblem: undefined, + })); + + const makeAsyncFlowKit = zone.exoClassKit( + tag, + AsyncFlowIKit, + (activationThis, ...activationArgs) => { + harden(activationThis); + harden(activationArgs); + const log = makeLogStore(); + const bijection = makeWeakBijection(); + + return { + activationThis, // replay starts by reactivating with these + activationArgs, // replay starts by reactivating with these + log, // log to be accumulated or replayed + bijection, // membrane's guest-host mapping + outcomeKit: makeVowKit(), // outcome of activation as host vow + isDone: false, // persistently done + }; + }, + { + flow: { + async wake() { + const { state, facets } = this; + if (tmp.for(facets.flow).optFatalProblem) { + throw tmp.for(facets.flow).optFatalProblem; + } + if (tmp.for(facets.flow).running) { + return; + } + if (state.isDone) { + return; + } + const membrane = makeReplayMembrane( + state.log, + state.bijection, + vowTools, + vowish => { + // Extra paranoid because we're getting + // "promise watcher must be a virtual object" + // in the general vicinity. + zone.isStorable(vowish) || + Fail`vowish must be storable in this zone (usually, must be durable): ${vowish}`; + zone.isStorable(facets.wakeWatcher) || + Fail`wakeWatcher must be storable in this zone (usually, must be durable): ${facets.wakeWatcher}`; + watch(vowish, facets.wakeWatcher); + }, + err => facets.admin.panic(err), + ); + const guestThis = membrane.hostToGuest(state.activationThis); + const guestArgs = membrane.hostToGuest(state.activationArgs); + + // In case some host promises were settled before the guest makes + // the first call to a host object. + void membrane.replayReady(); + + // We do *not* call the guesAsyncFunc by having the membrane make + // a host wrapper for the function. Rather, we special case this + // host-to-guest call by "manually" sending the arguments through + // and calling the guest function ourselves. Likewise, we + // special case the handling of the guestResultP, rather than + // as the membrane to make a host vow for a guest promise. + // To support this special casing, we store additional replay + // data in this internal flow instance -- the host activationArgs + // and the host outcome vow kit. + const guestResultP = apply(guestAsyncFunc, guestThis, guestArgs); + // log is driven at first by guestAyncFunc interaction through the + // membrane with the host activationArgs. At the end of its first + // turn, it returns a promise for its eventual guest result. + // It then proceeds to interact with the host through the membrane + // in further turns by `await`ing (or otherwise registering) + // on host vows turned into guest promises, and by calling + // the guest presence of other host objects. + void E.when( + guestResultP, + guestFulfillment => { + state.outcomeKit.resolver.resolve( + membrane.guestToHost(guestFulfillment), + ); + facets.admin.done(); + }, + guestReason => { + state.outcomeKit.resolver.reject( + membrane.guestToHost(guestReason), + ); + facets.admin.done(); + }, + ); + tmp.for(facets.flow).running = true; + }, + getOutcome() { + const { state, facets } = this; + void facets.flow.wake(); + return state.outcomeKit.vow; + }, + dump() { + const { state } = this; + return state.log.dump(); + }, + }, + admin: { + done() { + const { state, facets } = this; + state.bijection.reset(); + state.log.reset(); + state.isDone = true; + tmp.resetFor(facets.flow); + }, + panic(fatalProblem) { + const { facets } = this; + tmp.for(facets.flow).optFatalProblem = fatalProblem; + throw fatalProblem; + }, + }, + wakeWatcher: { + onFulfilled(_fulfillment) { + const { facets } = this; + void facets.flow.wake(); + }, + onRejected(_fulfillment) { + const { facets } = this; + void facets.flow.wake(); + }, + }, + }, + ); + return harden(makeAsyncFlowKit); +}; +harden(prepareAsyncFlowKit); + +/** + * @typedef {ReturnType>} AsyncFlowKit + */ + +/** + * @typedef {AsyncFlowKit['flow']} AsyncFlow + */ + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {string} tag + * @param {GuestAsyncFunc} guestAsyncFunc + * @param {PreparationOptions} [options] + */ +export const prepareAsyncFlow = ( + zone, + tag, + guestAsyncFunc, + options = undefined, +) => { + const makeAsyncFlowKit = prepareAsyncFlowKit( + zone, + tag, + guestAsyncFunc, + options, + ); + + const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`; + const makeAsyncFlow = { + [hostFuncName](...args) { + const { flow } = makeAsyncFlowKit(this, ...args); + return flow; + }, + }[hostFuncName]; + + defineProperties(makeAsyncFlow, { + length: { value: guestAsyncFunc.length }, + }); + + return harden(makeAsyncFlow); +}; +harden(prepareAsyncFlow); + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {string} tag + * @param {GuestAsyncFunc} guestAsyncFunc + * @param {PreparationOptions} [options] + */ +export const prepareAsyncFlowFunc = ( + zone, + tag, + guestAsyncFunc, + options = undefined, +) => { + const makeAsyncFlowKit = prepareAsyncFlowKit( + zone, + tag, + guestAsyncFunc, + options, + ); + + const hostFuncName = `${guestAsyncFunc.name || 'anon'}_hostWrapper`; + const makeAsyncFlow = { + [hostFuncName](...args) { + const { flow } = makeAsyncFlowKit(this, ...args); + return flow.getOutcome(); + }, + }[hostFuncName]; + + defineProperties(makeAsyncFlow, { + length: { value: guestAsyncFunc.length }, + }); + + return harden(makeAsyncFlow); +}; +harden(prepareAsyncFlow); diff --git a/packages/zone/src/async-flow/convert.js b/packages/zone/src/async-flow/convert.js new file mode 100644 index 00000000000..34341263735 --- /dev/null +++ b/packages/zone/src/async-flow/convert.js @@ -0,0 +1,120 @@ +import { Fail, X, annotateError, makeError, q } from '@endo/errors'; +import { throwLabeled } from '@endo/common/throw-labeled.js'; +import { + getErrorConstructor, + isObject, + makeTagged, + passStyleOf, +} from '@endo/pass-style'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { objectMap } from '@endo/common/object-map.js'; + +const makeConvert = (convertRemotable, convertPromiseOrVow, convertError) => { + const convert = (specimen, label) => { + // Open code the synchronous part of applyLabelingError, because + // we need to preserve returned promise identity. + // TODO switch to Richard Gibson's suggestion for a better way + // to keep track of the error labeling. + if (label === undefined) { + // eslint-disable-next-line no-use-before-define + return innerConvert(specimen); + } + try { + // eslint-disable-next-line no-use-before-define + return innerConvert(specimen); + } catch (err) { + throwLabeled(err, label); + } + }; + + const innerConvert = specimen => { + if (!isObject(specimen)) { + return specimen; + } + const passStyle = passStyleOf(specimen); + switch (passStyle) { + case 'copyArray': { + return specimen.map((element, i) => convert(element, i)); + } + case 'copyRecord': { + return objectMap(specimen, (value, name) => convert(value, name)); + } + case 'tagged': { + if (isVow(specimen)) { + return convertPromiseOrVow(specimen); + } + return makeTagged(specimen.tag, specimen.payload); + } + case 'error': { + return convertError(specimen); + } + case 'remotable': { + return convertRemotable(specimen); + } + case 'promise': { + return convertPromiseOrVow(specimen); + } + default: { + throw Fail`unexpected passStyle ${q(passStyle)}`; + } + } + }; + return harden(convert); +}; + +export const makeConvertKit = ( + bijection, + makeGuestForHostRemotable, + makeGuestForHostVow, +) => { + const guestToHost = makeConvert( + gRem => { + if (bijection.hasGuest(gRem)) { + return bijection.guestToHost(gRem); + } + throw Fail`cannot yet send guest remotables ${gRem}`; + }, + gProm => { + if (bijection.hasGuest(gProm)) { + return bijection.guestToHost(gProm); + } + throw Fail`cannot yet send guest promises ${gProm}`; + }, + gErr => { + const hErr = harden( + makeError(gErr.message, getErrorConstructor(gErr.name)), + ); + annotateError(hErr, X`from guest error ${gErr}`); + return hErr; + }, + ); + + const hostToGuest = makeConvert( + hRem => { + if (bijection.hasHost(hRem)) { + return bijection.hostToGuest(hRem); + } + const gRem = makeGuestForHostRemotable(hRem); + bijection.define(gRem, hRem); + return gRem; + }, + hVow => { + if (bijection.hasHost(hVow)) { + return bijection.hostToGuest(hVow); + } + const gP = makeGuestForHostVow(hVow); + bijection.define(gP, hVow); + return gP; + }, + hErr => { + const gErr = harden( + makeError(hErr.message, getErrorConstructor(hErr.name)), + ); + annotateError(gErr, X`from host error ${hErr}`); + return gErr; + }, + ); + + return harden({ guestToHost, hostToGuest }); +}; +harden(makeConvertKit); diff --git a/packages/zone/src/async-flow/ephemera.js b/packages/zone/src/async-flow/ephemera.js new file mode 100644 index 00000000000..1e5010eee08 --- /dev/null +++ b/packages/zone/src/async-flow/ephemera.js @@ -0,0 +1,34 @@ +import { Far } from '@endo/pass-style'; + +/** + * Used by a possibly-durable exo to store per-instance ephemeral state. + * Each ephemera is created at the exo class prepare level, and then + * used from within the exo class methods to get state `eph.for(self)`. + * At the beginning of a new incarnation, there is no such state, so + * the first time it is accessed, it is initialized from `reinit(self)`. + * The ephemeral state can be dropped explicitly during an incarnation + * with `eph.resetFor(self)`, in which case the `eph.for(self)` will + * call it to be reinitialized again from `reinit(self)`. + * + * @template {WeakKey} [S=WeakKey] + * @template {any} [V=any] + * @param {(self: S) => V} reinit + * @returns {Ephemera} + */ +export const makeEphemera = reinit => { + /** @type {WeakMap} */ + const map = new WeakMap(); + + return Far('ephemera', { + for(self) { + if (!map.has(self)) { + map.set(self, reinit(self)); + } + return /** @type {V} */ (map.get(self)); + }, + resetFor(self) { + return map.delete(self); + }, + }); +}; +harden(makeEphemera); diff --git a/packages/zone/src/async-flow/equate.js b/packages/zone/src/async-flow/equate.js new file mode 100644 index 00000000000..581da152ed0 --- /dev/null +++ b/packages/zone/src/async-flow/equate.js @@ -0,0 +1,121 @@ +import { Fail, X, annotateError, q } from '@endo/errors'; +import { throwLabeled } from '@endo/common/throw-labeled.js'; +import { isObject, passStyleOf } from '@endo/pass-style'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { recordNames } from '@endo/marshal'; + +const { is } = Object; + +export const makeEquate = bijection => { + const equate = (g, h, label) => { + // Open code the synchronous part of applyLabelingError, because + // we need to preserve returned promise identity. + // TODO switch to Richard Gibson's suggestion for a better way + // to keep track of the error labeling. + if (label === undefined) { + // eslint-disable-next-line no-use-before-define + innerEquate(g, h); + } + try { + // eslint-disable-next-line no-use-before-define + innerEquate(g, h); + } catch (err) { + throwLabeled(err, label); + } + }; + + const innerEquate = (g, h) => { + if (!isObject(g)) { + is(g, h) || Fail`unequal primitives ${g} vs ${h}`; + return; + } + if (bijection.has(g, h)) { + return; + } + const gPassStyle = passStyleOf(g); + if (gPassStyle === 'promise' && isVow(h)) { + // Important special case, because vows have passStyle 'tagged'. + // However, we do not yet support passing guest promise to host. + // TODO when we do, delete the `throw Fail` line and uncomment + // the two lines below it. + // We *do* support passing a guest wrapper of a hostVow back + // to the host, but that would be cause by `bijection.has` above. + throw Fail`guest promises not yet passable`; + // define does not yet do enough checking anyway. For this case, + // we should ensure that h is a host wrapper of a guest promise, + // which is a wrapping we don't yet support. + // bijection.define(g, h); + // return; + } + const hPassStyle = passStyleOf(h); + gPassStyle === hPassStyle || + Fail`unequal passStyles ${q(gPassStyle)} vs ${q(hPassStyle)}`; + switch (gPassStyle) { + case 'copyArray': { + equate(g.length, h.length, 'length'); + // eslint-disable-next-line github/array-foreach + g.forEach((gEl, i) => equate(gEl, h[i], i)); + return; + } + case 'copyRecord': { + const gNames = recordNames(g); + const hNames = recordNames(h); + equate(gNames, hNames, 'propertyNames'); + for (const name of gNames) { + equate(g[name], h[name], name); + } + return; + } + case 'tagged': { + equate(g.tag, h.tag, 'tag'); + equate(g.payload, h.payload, 'payload'); + return; + } + case 'error': { + equate(g.name, h.name, 'name'); + // For errors, all that needs to agree on replay is the `name` + // property. All others can differ. That's because everything else + // is assumed to be diagnostic info useful to programmers, which + // we'd like to improve over time. No programmatic use of additional + // error diagnostic info other than to better inform developers. + // A program should not take a semantically significant branch + // based on any of this diagnostic info, aside from `name`. + // + // Error annotations are not observable outside the console output, + // so this does not breach membrane isolation. + annotateError(g, X`replay of error ${h}`); + annotateError(h, X`replayed as error ${g}`); + return; + } + case 'remotable': { + // Note that we can send a guest wrapping of a host remotable + // back to the host, + // but that should have already been taken care of by the + // `bijection.has` above. + throw Fail`cannot yet send guest remotables to host ${g} vs ${h}`; + // define does not yet do enough checking anyway. For this case, + // we should ensure that h is a host wrapper of a guest remotable, + // which is a wrapping we don't yet support. + // bijection.define(g, h); + // return; + } + case 'promise': { + // Note that we can send a guest wrapping of a host promise + // (or vow) back to the host, + // but that should have already been taken care of by the + // `bijection.has` above. + throw Fail`cannot yet send guest promises to host ${g} vs ${h}`; + // define does not yet do enough checking anyway. For this case, + // we should ensure that h is a host wrapper of a guest promise, + // which is a wrapping we don't yet support. + // bijection.define(g, h); + // return; + } + default: { + throw Fail`unexpected passStyle ${q(gPassStyle)}`; + } + } + }; + return harden(equate); +}; +harden(makeEquate); diff --git a/packages/zone/src/async-flow/log-store.js b/packages/zone/src/async-flow/log-store.js new file mode 100644 index 00000000000..1bd2fc65140 --- /dev/null +++ b/packages/zone/src/async-flow/log-store.js @@ -0,0 +1,139 @@ +import { Fail, q } from '@endo/errors'; +import { makePromiseKit } from '@endo/promise-kit'; +import { M } from '@endo/patterns'; +import { LogEntryShape } from './type-guards.js'; +import { makeEphemera } from './ephemera.js'; + +const LogStoreI = M.interface('LogStore', { + reset: M.call().returns(), + restart: M.call().returns(), + getIndex: M.call().returns(M.number()), + getLength: M.call().returns(M.number()), + isReplaying: M.call().returns(M.boolean()), + peekEntry: M.call().returns(LogEntryShape), + nextEntry: M.call().returns(LogEntryShape), + pushEntry: M.call(LogEntryShape).returns(M.number()), + dump: M.call().returns(M.arrayOf(LogEntryShape)), + promiseReplayDone: M.call().returns(M.promise()), +}); + +/** + * A growable, replayable, sequence of `LogEntry`s. + * + * @param {import('@agoric/base-zone').Zone} zone + */ +export const prepareLogStore = zone => { + /** + * @type {Ephemera + * }>} + */ + const tmp = makeEphemera(log => { + const result = { + index: 0, + replayDoneKit: makePromiseKit(), + }; + if (log.getLength() === 0) { + result.replayDoneKit.resolve(undefined); + } + return result; + }); + + return zone.exoClass( + 'LogStore', + LogStoreI, + () => { + /** + * Really used to emulate a zone-storable vector, i.e., what in + * conventional JS you'd use a mutable array for, where you mutate + * only by `.push` + * + * @type {MapStore} + */ + const mapStore = zone.detached().mapStore('logMapStore', { + keyShape: M.number(), + valueShape: LogEntryShape, + }); + return { + mapStore, + }; + }, + { + reset() { + const { state, self } = this; + tmp.resetFor(self); + state.mapStore.clear(); + }, + restart() { + const { self } = this; + tmp.resetFor(self); + }, + getIndex() { + const { self } = this; + return tmp.for(self).index; + }, + getLength() { + const { state } = this; + return state.mapStore.getSize(); + }, + isReplaying() { + const { state, self } = this; + return tmp.for(self).index < state.mapStore.getSize(); + }, + peekEntry() { + const { state, self } = this; + self.isReplaying() || + Fail`No longer replaying: ${q(tmp.for(self).index)} vs ${q( + state.mapStore.getSize(), + )}`; + const result = state.mapStore.get(tmp.for(self).index); + return result; + }, + nextEntry() { + const { self } = this; + const result = self.peekEntry(); + tmp.for(self).index += 1; + if (!self.isReplaying()) { + tmp.for(self).replayDoneKit.resolve(undefined); + } + return result; + }, + pushEntry(entry) { + const { state, self } = this; + !self.isReplaying() || + Fail`still replaying: ${q(tmp.for(self).index)} vs ${q( + state.mapStore.getSize(), + )}`; + tmp.for(self).index === state.mapStore.getSize() || + Fail`internal: index confusion ${q(tmp.for(self).index)} vs ${q( + state.mapStore.getSize(), + )}`; + state.mapStore.init(tmp.for(self).index, entry); + tmp.for(self).index += 1; + tmp.for(self).index === state.mapStore.getSize() || + Fail`internal: index confusion ${q(tmp.for(self).index)} vs ${q( + state.mapStore.getSize(), + )}`; + return tmp.for(self).index; + }, + dump() { + const { state } = this; + const len = state.mapStore.getSize(); + const result = []; + for (let i = 0; i < len; i += 1) { + result.push(state.mapStore.get(i)); + } + return harden(result); + }, + promiseReplayDone() { + const { self } = this; + return tmp.for(self).replayDoneKit.promise; + }, + }, + ); +}; + +/** + * @typedef {ReturnType>} LogStore + */ diff --git a/packages/zone/src/async-flow/replay-membrane.js b/packages/zone/src/async-flow/replay-membrane.js new file mode 100644 index 00000000000..47f5996867c --- /dev/null +++ b/packages/zone/src/async-flow/replay-membrane.js @@ -0,0 +1,339 @@ +/* eslint-disable no-use-before-define */ +import { Fail, q } from '@endo/errors'; +import { Remotable, getInterfaceOf } from '@endo/pass-style'; +import { E } from '@endo/eventual-send'; +import { getMethodNames } from '@endo/eventual-send/utils.js'; +import { makePromiseKit } from '@endo/promise-kit'; +import { makeEquate } from './equate.js'; +import { makeConvertKit } from './convert.js'; + +const { fromEntries, defineProperties } = Object; + +/** + * @param {import('./log-store.js').LogStore} log + * @param {import('./weak-bijection.js').WeakBijection} bijection + * @param {import('@agoric/vow').VowTools} vowTools + * @param {(vowish: Promise | Vow) => void} watchWake + * @param {(problem: Error) => void} panic + */ +export const makeReplayMembrane = ( + log, + bijection, + vowTools, + watchWake, + panic, +) => { + const { watch: _watch, when, makeVowKit: _makeVowKit } = vowTools; + + const equate = makeEquate(bijection); + + const guestPromiseMap = new WeakMap(); + + // ////////////// Host or Interpreter to Guest /////////////////////////////// + + /** + * When replaying, this comes from interpreting the log. + * Otherwise, it is triggered by a watcher watching hostVow, + * that must also log it. + * + * @param {HostVow} hostVow + * @param {Host} hostFulfillment + */ + const doFulfill = (hostVow, hostFulfillment) => { + const guestPromise = hostToGuest(hostVow); + const status = guestPromiseMap.get(guestPromise); + if (status === 'settled') { + return; + } + const guestFulfillment = hostToGuest(hostFulfillment); + status.resolve(guestFulfillment); + guestPromiseMap.set(guestPromise, 'settled'); + }; + + /** + * When replaying, this comes from interpreting the log. + * Otherwise, it is triggered by a watcher watching hostVow, + * that must also log it. + * + * @param {HostVow} hostVow + * @param {Host} hostReason + */ + const doReject = (hostVow, hostReason) => { + const guestPromise = hostToGuest(hostVow); + const status = guestPromiseMap.get(guestPromise); + if (status === 'settled') { + return; + } + const guestReason = hostToGuest(hostReason); + status.reject(guestReason); + guestPromiseMap.delete(guestPromise); + }; + + /** + * When replaying, after the guest thinks it has called a host method, + * triggering `checkCall`, that host method emulator consumes one of + * these entries from the log to return what it is supposed to. + * It returns an Outcome describing either a throw or return, because we + * reserve the actual throw channels for replay errors and internal + * errors. + * + * @param {number} callIndex + * @param {Host} hostResult + * @returns {Outcome} + */ + const doReturn = (callIndex, hostResult) => { + unnestInterpreter(callIndex); + const guestResult = hostToGuest(hostResult); + return harden({ + kind: 'return', + result: guestResult, + }); + }; + + /** + * When replaying, after the guest thinks it has called a host method, + * triggering `checkCall`, that host method emulator consumes one of + * these entries from the log to return what it is supposed to. + * It returns an Outcome describing either a throw or return, because we + * reserve the actual throw channels for replay errors and internal + * errors. + * + * @param {number} callIndex + * @param {Host} hostProblem + * @returns {Outcome} + */ + const doThrow = (callIndex, hostProblem) => { + unnestInterpreter(callIndex); + const guestProblem = hostToGuest(hostProblem); + return harden({ + kind: 'throw', + problem: guestProblem, + }); + }; + + // ///////////// Guest to Host or consume log //////////////////////////////// + + const performCall = (hostTarget, optVerb, hostArgs, callIndex) => { + let hostResult; + try { + hostResult = optVerb + ? hostTarget[optVerb](...hostArgs) + : hostTarget(...hostArgs); + } catch (hostProblem) { + return logDo(nestDispatch, harden(['doThrow', callIndex, hostProblem])); + } + return logDo(nestDispatch, harden(['doReturn', callIndex, hostResult])); + }; + + const guestCallsHost = (guestTarget, optVerb, guestArgs, callIndex) => { + /** @type {Outcome} */ + let outcome; + try { + const guestEntry = harden([ + 'checkCall', + guestTarget, + optVerb, + guestArgs, + callIndex, + ]); + if (log.isReplaying()) { + const entry = log.nextEntry(); + equate(guestEntry, entry, `replay ${callIndex}`); + outcome = /** @type {Outcome} */ (nestInterpreter(callIndex)); + } else { + const entry = guestToHost(guestEntry); + log.pushEntry(entry); + const [_, ...args] = entry; + nestInterpreter(callIndex); + outcome = performCall(...args); + } + } catch (fatalError) { + throw panic(fatalError); + } + + if (outcome.kind === 'return') { + return outcome.result; + } else { + outcome.kind === 'throw' || + // @ts-expect-error TS correctly knows this case would be outside + // the type. But that's what we want to check. + Fail`unexpected outcome kind ${q(outcome.kind)}`; + throw outcome.problem; + } + }; + + // //////////////// Converters /////////////////////////////////////////////// + + const makeGuestForHostRemotable = hRem => { + let gRem; + /** @param {PropertyKey} [optVerb] */ + const makeGuestMethod = (optVerb = undefined) => { + const guestMethod = (...guestArgs) => { + const callIndex = log.getIndex(); + return guestCallsHost(gRem, optVerb, guestArgs, callIndex); + }; + if (optVerb) { + defineProperties(guestMethod, { + name: { value: String(optVerb) }, + length: { value: Number(hRem[optVerb].length || 0) }, + }); + } else { + defineProperties(guestMethod, { + name: { value: String(hRem.name || 'anon') }, + length: { value: Number(hRem.length || 0) }, + }); + } + return guestMethod; + }; + const iface = String(getInterfaceOf(hRem) || 'remotable'); + const guestIface = `${iface} guest wrapper`; // just for debugging clarity + if (typeof hRem === 'function') { + // NOTE: Assumes that a far function has no "static" methods. This + // is the current marshal design, but revisit this if we change our + // minds. + gRem = Remotable(guestIface, undefined, makeGuestMethod()); + // NOTE: If we ever do support that, probably all we need + // to do is remove the following `throw Fail` line. + throw Fail`host far functions not yet passable`; + } else { + const methodNames = getMethodNames(hRem); + const guestMethods = methodNames.map(name => [ + name, + makeGuestMethod(name), + ]); + // TODO in order to support E *well*, + // use HandledPromise to make gRem a remote presence for hRem + gRem = Remotable(guestIface, undefined, fromEntries(guestMethods)); + } + return gRem; + }; + harden(makeGuestForHostRemotable); + + const makeGuestForHostVow = hVow => { + // TODO in order to support E *well*, + // use HandledPromise to make `promise` a handled promise for hVow + const { promise, resolve, reject } = makePromiseKit(); + guestPromiseMap.set(promise, harden({ resolve, reject })); + + watchWake(hVow); + + void when( + hVow, + hostFulfillment => { + if (!log.isReplaying() && guestPromiseMap.get(promise) !== 'settled') { + /** @type {LogEntry} */ + const entry = harden(['doFulfill', hVow, hostFulfillment]); + log.pushEntry(entry); + interpretOne(topDispatch, entry); + } + }, + hostReason => { + if (!log.isReplaying()) { + /** @type {LogEntry} */ + const entry = harden(['doReject', hVow, hostReason]); + log.pushEntry(entry); + interpretOne(topDispatch, entry); + } + }, + ); + return promise; + }; + harden(makeGuestForHostVow); + + const { guestToHost, hostToGuest } = makeConvertKit( + bijection, + makeGuestForHostRemotable, + makeGuestForHostVow, + ); + + // /////////////////////////////// Interpreter /////////////////////////////// + + /** + * These are the only ones that are driven from the interpreter loop + */ + const topDispatch = harden({ + doFulfill, + doReject, + // doCall, // unimplemented in the current plan + }); + + /** + * These are the only ones that are driven from the interpreter loop + */ + const nestDispatch = harden({ + // doCall, // unimplemented in the current plan + doReturn, + doThrow, + }); + + const interpretOne = (dispatch, [op, ...args]) => { + try { + op in dispatch || Fail`unexpected dispatch op: ${q(op)}`; + return dispatch[op](...args); + } catch (problem) { + throw panic(problem); + } + }; + + const logDo = (dispatch, entry) => { + log.pushEntry(entry); + return interpretOne(dispatch, entry); + }; + + const callStack = []; + + let unnestFlag = false; + + /** + * @param {number} callIndex + * @returns {Outcome | undefined} + */ + const nestInterpreter = callIndex => { + callStack.push(callIndex); + while (log.isReplaying()) { + const entry = log.nextEntry(); + const optOutcome = interpretOne(nestDispatch, entry); + if (unnestFlag) { + optOutcome || Fail`only unnest with an outcome: ${q(entry[0])}`; + unnestFlag = false; + return optOutcome; + } + } + unnestFlag = false; + }; + + /** + * @param {number} callIndex + */ + const unnestInterpreter = callIndex => { + callStack.length >= 1 || Fail`Unmatched unnest: ${q(callIndex)}`; + const i = callStack.pop(); + i === callIndex || Fail`Unexpected unnest: ${q(callIndex)} vs ${q(i)}`; + unnestFlag = true; + if (callStack.length === 0) { + void E.when(undefined, replayReady); + } + }; + + const replayReady = () => { + while (log.isReplaying()) { + callStack.length === 0 || + Fail`replayReady only with empty callStack: ${q(callStack)}`; + const entry = log.peekEntry(); + const op = entry[0]; + if (!(op in topDispatch)) { + return; + } + void log.nextEntry(); + interpretOne(topDispatch, entry); + } + }; + + const replayMembrane = harden({ + hostToGuest, + guestToHost, + replayReady, + }); + return replayMembrane; +}; +harden(makeReplayMembrane); diff --git a/packages/zone/src/async-flow/type-guards.js b/packages/zone/src/async-flow/type-guards.js new file mode 100644 index 00000000000..739ec015f78 --- /dev/null +++ b/packages/zone/src/async-flow/type-guards.js @@ -0,0 +1,48 @@ +import { M } from '@endo/patterns'; +import { VowShape } from '@agoric/vow'; + +export const PropertyKeyShape = M.or(M.string(), M.symbol()); + +export const HostVowShape = M.or(VowShape, M.promise()); + +export const LogEntryShape = M.or( + // ////////////////////////////// From Host to Guest ///////////////////////// + ['doFulfill', HostVowShape, M.any()], + ['doReject', HostVowShape, M.any()], + // [ + // 'doCall', + // M.remotable('host wrapper of guest target'), + // M.opt(PropertyKeyShape), + // M.arrayOf(M.any()), + // M.number(), + // ], + // [ + // 'doSend', + // M.or(M.remotable('host wrapper of guest target'), VowShape), + // M.opt(PropertyKeyShape), + // M.arrayOf(M.any()), + // M.number(), + // ], + ['doReturn', M.number(), M.any()], + ['doThrow', M.number(), M.any()], + + // ////////////////////////////// From Guest to Host ///////////////////////// + // ['checkFulfill', HostVowShape, M.any()], + // ['checkReject', HostVowShape, M.any()], + [ + 'checkCall', + M.remotable('host target'), + M.opt(PropertyKeyShape), + M.arrayOf(M.any()), + M.number(), + ], + // [ + // 'checkSend', + // M.or(M.remotable('host target'), VowShape), + // M.opt(PropertyKeyShape), + // M.arrayOf(M.any()), + // M.number(), + // ], + // ['checkReturn', M.number(), M.any()], + // ['checkThrow', M.number(), M.any()], +); diff --git a/packages/zone/src/async-flow/types.js b/packages/zone/src/async-flow/types.js new file mode 100644 index 00000000000..f99a84d6d0b --- /dev/null +++ b/packages/zone/src/async-flow/types.js @@ -0,0 +1,150 @@ +/** + * @template {Passable} [T=Passable] + * @typedef {T} Guest + */ + +/** + * @template {Passable} [T=Passable] + * @typedef {T} Host + */ + +/** + * @template {Passable} [T=Passable] + * @typedef {import('@agoric/vow').Vow} Vow + */ + +/** + * A HostVow must be durably storable. It corresponds to an + * ephemeral guest promise. + * + * @template {Passable} [T=Passable] + * @typedef {Host | Vow | Promise>>} HostVow + */ + +/** + * @typedef {(...activationArgs: Guest[]) => Guest} GuestAsyncFunc + */ + +/** + * @typedef {(...activationArgs: Host[]) => HostVow} HostAsyncFuncWrapper + */ + +/** + * @typedef {object} PreparationOptions + * @property {import('@agoric/vow').VowTools} [vowTools] + * @property {() => import('./log-store.js').LogStore} [makeLogStore] + * @property {() => import('./weak-bijection.js').WeakBijection + * } [makeWeakBijection] + */ + +/** + * @typedef {'return'|'throw'} OutcomeKind + */ + +/** + * @typedef {{kind: 'return', result: any} + * | {kind: 'throw', problem: any} + * } Outcome + */ + +/** + * @template {WeakKey} [S=WeakKey] + * @template {any} [V=any] + * @typedef {object} Ephemera + * @property {(self: S) => V} for + * @property {(self: S) => void} resetFor + */ + +/** + * This is the typedef for the membrane log entries we currently implement. + * See comment below for the commented-out typedef for the full + * membrane log entry, which we do not yet support. + * + * @typedef {[ // ///////////////// From Host to Guest ///////////////////////// + * op: 'doFulfill', + * prom: Host, + * fulfillment: Host, + * ] | [ + * op: 'doReject', + * prom: Host, + * reason: Host, + * ] | [ + * op: 'doReturn', + * callIndex: number, + * result: Host, + * ] | [ + * op: 'doThrow', + * callIndex: number, + * problem: Host, + * ] | [ // ///////////////////// From Guest to Host ///////////////////////// + * op: 'checkCall', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number + * ]} LogEntry + */ + +/** + * This would be the typedef for the full membrane log, if we supported + * the guest sending promises and remotables to the host, for the host to + * then use. + * + * at-typedef {[ // ///////////////// From Host to Guest /////////////////////// + * op: 'doFulfill', + * prom: Host, + * fulfillment: Host, + * ] | [ + * op: 'doReject', + * prom: Host, + * reason: Host, + * ] | [ + * op: 'doCall', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number + * ] | [ + * op: 'doSend', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number + * ] | [ + * op: 'doReturn', + * callIndex: number, + * result: Host, + * ] | [ + * op: 'doThrow', + * callIndex: number, + * problem: Host, + * ] | [ // ///////////////////// From Guest to Host ///////////////////////// + * op: 'checkFulfill', + * prom: Host, + * fulfillment: Host, + * ] | [ + * op: 'checkReject', + * prom: Host, + * reason: Host, + * ] | [ + * op: 'checkCall', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number + * ] | [ + * op: 'checkSend', + * target: Host, + * optVerb: PropertyKey|undefined, + * args: Host[], + * callIndex: number + * ] | [ + * op: 'checkReturn', + * callIndex: number, + * result: Host, + * ] | [ + * op: 'checkThrow', + * callIndex: number, + * problem: Host, + * ]} LogEntry + */ diff --git a/packages/zone/src/async-flow/weak-bijection.js b/packages/zone/src/async-flow/weak-bijection.js new file mode 100644 index 00000000000..165dd17c142 --- /dev/null +++ b/packages/zone/src/async-flow/weak-bijection.js @@ -0,0 +1,122 @@ +import { Fail } from '@endo/errors'; +import { M } from '@endo/patterns'; +import { getVowPayload } from '@agoric/vow/src/vow-utils.js'; +import { Far } from '@endo/pass-style'; +import { makeEphemera } from './ephemera.js'; + +export const vowishKey = k => { + const payload = getVowPayload(k); + if (payload === undefined) { + return k; + } + const { vowV0 } = payload; + // vowMap.set(vowV0, h); + return vowV0; +}; +harden(vowishKey); + +const WeakBijectionI = M.interface('WeakBijection', { + reset: M.call().returns(), + init: M.call(M.any(), M.any()).returns(), + hasGuest: M.call(M.any()).returns(M.boolean()), + hasHost: M.call(M.any()).returns(M.boolean()), + has: M.call(M.any(), M.any()).returns(M.boolean()), + guestToHost: M.call(M.any()).returns(M.any()), + hostToGuest: M.call(M.any()).returns(M.any()), + define: M.call(M.any(), M.any()).returns(), +}); + +/** + * Makes a store like a WeakMapStore except that Promises and Vows can also be + * used as keys. + * NOTE: This depends on promise identity being stable! + * + * @param {string} name + */ +const makeVowishStore = name => { + // The vowMap would be needed if we supported enumeration, + // in order to reconstruct the original keys. + // const vowMap = new Map(); + const map = new WeakMap(); + + return Far(name, { + init: (k, v) => { + const k2 = vowishKey(k); + !map.has(k2) || Fail`key already bound: ${k} -> ${map.get(k2)} vs ${v}`; + map.set(k2, v); + }, + has: k => map.has(vowishKey(k)), + get: k => { + const k2 = vowishKey(k); + map.has(k2) || Fail`key not found: ${k}`; + return map.get(k2); + }, + }); +}; + +/** @typedef {ReturnType} VowishStore */ + +/** + * @param {import('@agoric/base-zone').Zone} zone + */ +export const prepareWeakBijection = zone => { + /** @type {Ephemera} */ + const g2h = makeEphemera(() => makeVowishStore('guestToHost')); + /** @type {Ephemera} */ + const h2g = makeEphemera(() => makeVowishStore('hostToGuest')); + + return zone.exoClass('WeakBijection', WeakBijectionI, () => ({}), { + reset() { + const { self } = this; + g2h.resetFor(self); + h2g.resetFor(self); + }, + init(g, h) { + const { self } = this; + g2h.for(self).init(g, h); + h2g.for(self).init(h, g); + self.has(g, h) || Fail`internal: ${g} <-> ${h}`; + }, + define(g, h) { + const { self } = this; + if (!self.has(g, h)) { + self.init(g, h); + } + }, + hasGuest(g) { + const { self } = this; + return g2h.for(self).has(g); + }, + hasHost(h) { + const { self } = this; + return h2g.for(self).has(h); + }, + has(g, h) { + const { self } = this; + if (g2h.for(self).has(g)) { + g2h.for(self).get(g) === h || + Fail`internal: g->h ${g} -> ${h} vs ${g2h.for(self).get(g)}`; + h2g.for(self).get(h) === g || + Fail`internal h->g: ${h} -> ${g} vs ${h2g.for(self).get(h)}`; + return true; + } else { + !h2g.for(self).has(h) || + Fail`internal: unexpected h->g ${h} -> ${g2h.for(self).get(h)}`; + return false; + } + }, + guestToHost(g) { + const { self } = this; + return g2h.for(self).get(g); + }, + hostToGuest(h) { + const { self } = this; + return h2g.for(self).get(h); + }, + }); +}; +harden(prepareWeakBijection); + +/** + * @typedef {ReturnType>} WeakBijection + */ diff --git a/packages/zone/test/async-flow/test-async-flow.js b/packages/zone/test/async-flow/test-async-flow.js new file mode 100644 index 00000000000..e2756711632 --- /dev/null +++ b/packages/zone/test/async-flow/test-async-flow.js @@ -0,0 +1,221 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, + asyncFlowVerbose, +} from '../prepare-test-env-ava.js'; + +// import { Fail } from '@endo/errors'; +// import { E } from '@endo/far'; +// import E from '@agoric/vow/src/E.js'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { makePromiseKit } from '@endo/promise-kit'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { prepareAsyncFlow } from '../../src/async-flow/async-flow.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +const { apply } = Reflect; + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {number} [k] + */ +const prepareOrchestra = (zone, k = 1) => + zone.exoClass( + 'Orchestra', + undefined, + (factor, vow, resolver) => ({ factor, vow, resolver }), + { + scale(n) { + const { state } = this; + return k * state.factor * n; + }, + vow() { + const { state } = this; + return state.vow; + }, + resolve(x) { + const { state } = this; + state.resolver.resolve(x); + }, + }, + ); + +/** @typedef {ReturnType>} Orchestra */ + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testFirstPlay = async (t, zone, vowTools, showOnConsole = false) => { + const makeOrchestra = prepareOrchestra(zone); + const { makeVowKit } = vowTools; + + const { vow: v1, resolver: r1 } = zone.makeOnce('v1', () => makeVowKit()); + const { vow: v2, resolver: r2 } = zone.makeOnce('v2', () => makeVowKit()); + const hOrch7 = zone.makeOnce('hOrch7', () => makeOrchestra(7, v2, r2)); + + // purposely violate rule that guestMethod is closed. + const { promise: promiseTestDone, resolve: endTest } = makePromiseKit(); + + const { guestMethod } = { + async guestMethod(gOrch7, gP) { + t.is(this, 'context'); + if (showOnConsole) { + console.log('about to await gP'); + } + await gP; + const prod = gOrch7.scale(3); + t.is(prod, 21); + + let gErr; + try { + gOrch7.scale(9n); + } catch (e) { + gErr = e; + } + t.is(gErr.name, 'TypeError'); + + const g2 = gOrch7.vow(); + endTest(true); + if (showOnConsole) { + console.log('about to await g2'); + } + await g2; // awaiting a promise that won't be resolved until next turn + if (showOnConsole) { + console.log('should not happen in first incarnation'); + } + t.fail('must not reach here in first incarnation'); + }, + }; + + const makeAsyncFlow = prepareAsyncFlow(zone, 'AsyncFlow1', guestMethod, { + vowTools, + }); + + const asyncFlow = zone.makeOnce('asyncFlow', () => + apply(makeAsyncFlow, 'context', [hOrch7, v1]), + ); + const outcomeV = zone.makeOnce('outcomeV', () => asyncFlow.getOutcome()); + t.true(isVow(outcomeV)); + r1.resolve('x'); + + if (showOnConsole) { + console.log('done', await promiseTestDone); + console.log(asyncFlow.dump()); + } + return promiseTestDone; +}; + +// /** +// * @param {any} t +// * @param {import('@agoric/base-zone').Zone} zone +// * @param {import('@agoric/vow').VowTools} vowTools +// * @param {boolean} [showOnConsole] +// */ +// const testBadReplay = async (t, zone, vowTools, showOnConsole = false) => { +// if (showOnConsole) { +// console.log('badReplay started'); +// } +// prepareOrchestra(zone); +// const { when } = vowTools; +// const hOrch7 = /** @type {Orchestra} */ ( +// zone.makeOnce('hOrch7', () => Fail`hOrch7 expected`) +// ); +// // const outcomeV = /** @type {Vow} */ ( +// // zone.makeOnce('outcomeV', () => Fail`outcomeV expected`) +// // ); + +// // purposely violate rule that guestMethod is closed. +// const { promise: promiseTestDone, resolve: endTest } = makePromiseKit(); + +// const { guestMethod } = { +// async guestMethod(gOrch7, gP) { +// t.is(this, 'context'); +// if (showOnConsole) { +// console.log('about to await gP'); +// } +// await gP; +// const prod = gOrch7.scale(3); +// t.is(prod, 21); + +// let gErr; +// try { +// gOrch7.scale(9n); +// } catch (e) { +// gErr = e; +// } +// t.is(gErr.name, 'TypeError'); + +// const g2 = gOrch7.vow(); +// endTest(true); +// if (showOnConsole) { +// console.log('about to await g2'); +// } +// await g2; // awaiting a promise that won't be resolved until this turn +// if (showOnConsole) { +// console.log('I woke up!'); +// } +// endTest('done'); +// }, +// }; + +// prepareAsyncFlow(zone, 'AsyncFlow1', guestMethod, { vowTools }); + +// hOrch7.resolve('y'); +// t.is(await when(hOrch7.vow()), 'y'); + +// if (showOnConsole) { +// console.log('badReplay done', await promiseTestDone); +// } +// return promiseTestDone; +// }; + +// /** +// * @param {any} _t +// * @param {import('@agoric/base-zone').Zone} _zone +// * @param {import('@agoric/vow').VowTools} _vowTools +// */ +// const testGoodReplay = async (_t, _zone, _vowTools) => { +// // +// }; + +await test.serial('test heap async-flow', async t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + return testFirstPlay(t, zone, vowTools, asyncFlowVerbose()); +}); + +await test.serial('test virtual async-flow', async t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + return testFirstPlay(t, zone, vowTools); +}); + +await test.serial('test durable async-flow', async t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + await testFirstPlay(t, zone1, vowTools1); + + // nextLife(); + // const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + // const vowTools2 = prepareWatchableVowTools(zone2); + // await testBadReplay(t, zone2, vowTools2, asyncFlowVerbose()); + + // nextLife(); + // const zone3 = makeDurableZone(getBaggage(), 'durableRoot'); + // const vowTools3 = prepareWatchableVowTools(zone3); + // return testGoodReplay(t, zone3, vowTools3); +}); diff --git a/packages/zone/test/async-flow/test-convert.js b/packages/zone/test/async-flow/test-convert.js new file mode 100644 index 00000000000..b8a9a5181e7 --- /dev/null +++ b/packages/zone/test/async-flow/test-convert.js @@ -0,0 +1,120 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, + asyncFlowVerbose, +} from '../prepare-test-env-ava.js'; + +import { X, makeError, q } from '@endo/errors'; +import { Far, getInterfaceOf, passStyleOf } from '@endo/pass-style'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { makeConvertKit } from '../../src/async-flow/convert.js'; +import { prepareWeakBijection } from '../../src/async-flow/weak-bijection.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testConvert = (t, zone, { makeVowKit }, showOnConsole = false) => { + const makeBijection = prepareWeakBijection(zone); + const bij = zone.makeOnce('bij', makeBijection); + + const makeGuestForHostRemotable = hRem => { + const iface = getInterfaceOf(hRem); + return Far(`${iface} guest wrapper`, {}); + }; + + const makeGuestForHostVow = _hVow => Promise.resolve('guest P'); + + const { guestToHost, hostToGuest } = makeConvertKit( + bij, + makeGuestForHostRemotable, + makeGuestForHostVow, + ); + + t.is(hostToGuest(8), 8); + const h1 = zone.exo('h1', undefined, {}); + const h2 = zone.exo('h2', undefined, {}); + const h3 = zone.makeOnce('h3', () => makeVowKit().vow); + t.true(isVow(h3)); + + const g1 = hostToGuest(h1); + const g2 = hostToGuest(h2); + const g3 = hostToGuest(h3); + t.is(passStyleOf(g1), 'remotable'); + t.is(passStyleOf(g2), 'remotable'); + t.is(passStyleOf(g3), 'promise'); + t.not(g1, g2); + t.is(hostToGuest(h1), g1); + t.is(hostToGuest(h2), g2); + t.is(hostToGuest(h3), g3); + + const h4 = harden(makeError(X`open ${'redacted'} ${q('quoted')}`)); + const g4a = hostToGuest(h4); + const g4b = hostToGuest(h4); + t.not(g4a, g4b); + t.deepEqual(g4a, g4b); + + t.is(guestToHost(g1), h1); + t.is(guestToHost(g2), h2); + t.is(guestToHost(g3), h3); + + t.deepEqual(guestToHost(harden([g1, g3])), harden([h1, h3])); + + const gErr1 = harden(makeError(X`error ${'redacted message'}`, URIError)); + const hErr1 = guestToHost(gErr1); + const gErr2 = hostToGuest(hErr1); + + t.not(gErr1, hErr1); + t.not(hErr1, gErr2); + t.not(gErr1, gErr2); + t.is(gErr1.name, 'URIError'); + t.is(hErr1.name, 'URIError'); + t.is(gErr2.name, 'URIError'); + + if (showOnConsole) { + // To see the annotation chain. Once we're synced with the next ses-ava, + // change this to a t.log, so we will see the annotation chain in context. + console.log('gErr2', gErr2); + } +}; + +test('test heap convert', t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + testConvert(t, zone, vowTools, asyncFlowVerbose()); +}); + +test.serial('test virtual convert', t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + testConvert(t, zone, vowTools); +}); + +test.serial('test durable convert', t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + testConvert(t, zone1, vowTools1); + + // These converters keep their state only in the bijection, + // which looses all its memory between incarnations. + + nextLife(); + const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools2 = prepareWatchableVowTools(zone2); + testConvert(t, zone2, vowTools2); +}); diff --git a/packages/zone/test/async-flow/test-equate.js b/packages/zone/test/async-flow/test-equate.js new file mode 100644 index 00000000000..e000a424616 --- /dev/null +++ b/packages/zone/test/async-flow/test-equate.js @@ -0,0 +1,121 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, + asyncFlowVerbose, +} from '../prepare-test-env-ava.js'; + +import { X, makeError } from '@endo/errors'; +import { Far } from '@endo/pass-style'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { prepareWeakBijection } from '../../src/async-flow/weak-bijection.js'; +import { makeEquate } from '../../src/async-flow/equate.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testEquate = (t, zone, { makeVowKit }, showOnConsole = false) => { + const makeBijection = prepareWeakBijection(zone); + const bij = zone.makeOnce('bij', makeBijection); + + t.throws(() => zone.makeOnce('equate', () => makeEquate(bij)), { + message: 'maker return value "[Function equate]" is not storable', + }); + + const equate = makeEquate(bij); + + equate(8, 8); + t.throws(() => equate(8, 9), { + message: 'unequal primitives 8 vs 9', + }); + + const h1 = zone.exo('h1', undefined, {}); + const h2 = zone.makeOnce('h2', () => makeVowKit().vow); + t.true(isVow(h2)); + + const g1 = Far('g1', {}); + const g2 = harden(Promise.resolve('g2')); + + t.throws(() => equate(g1, h1), { + message: + 'cannot yet send guest remotables to host "[Alleged: g1]" vs "[Alleged: h1]"', + }); + bij.init(g1, h1); + equate(g1, h1); + t.throws(() => equate(g1, h2), { + message: 'internal: g->h "[Alleged: g1]" -> "[Vow]" vs "[Alleged: h1]"', + }); + t.throws(() => equate(g2, h1), { + message: 'key not found: "[Alleged: h1]"', + }); + bij.init(g2, h2); + equate(g2, h2); + + t.throws(() => equate(g1, h2), { + message: 'internal: g->h "[Alleged: g1]" -> "[Vow]" vs "[Alleged: h1]"', + }); + t.throws(() => equate(g2, h1), { + message: 'internal: g->h "[Promise]" -> "[Alleged: h1]" vs "[Vow]"', + }); + + equate(harden([g1, g2]), harden([h1, h2])); + t.throws(() => equate(harden([g1, g2]), harden([h1, h1])), { + message: '[1]: internal: g->h "[Promise]" -> "[Alleged: h1]" vs "[Vow]"', + }); + + const gErr1 = harden(makeError(X`error ${'redacted message'}`, URIError)); + const hErr1 = harden(makeError(X`another message`, URIError)); + const gErr2 = harden(makeError(X`another error`, TypeError)); + + equate(gErr1, hErr1); + t.throws(() => equate(gErr2, hErr1), { + message: 'name: unequal primitives "TypeError" vs "URIError"', + }); + + if (showOnConsole) { + // To see the annotation chain. Once we're synced with the next ses-ava, + // change this to a t.log, so we will see the annotation chain in context. + console.log('hErr1', hErr1); + } +}; + +test('test heap equate', t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + testEquate(t, zone, vowTools, asyncFlowVerbose()); +}); + +test.serial('test virtual equate', t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + testEquate(t, zone, vowTools); +}); + +test.serial('test durable equate', t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + testEquate(t, zone1, vowTools1); + + // equate keeps its state only in the bijection, + // which looses all its memory between incarnations. + + nextLife(); + const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools2 = prepareWatchableVowTools(zone2); + testEquate(t, zone2, vowTools2); +}); diff --git a/packages/zone/test/async-flow/test-log-store.js b/packages/zone/test/async-flow/test-log-store.js new file mode 100644 index 00000000000..e2048fed3d2 --- /dev/null +++ b/packages/zone/test/async-flow/test-log-store.js @@ -0,0 +1,113 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, +} from '../prepare-test-env-ava.js'; + +import { Fail } from '@endo/errors'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { prepareLogStore } from '../../src/async-flow/log-store.js'; +import { vowishKey } from '../../src/async-flow/weak-bijection.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + */ +const testLogStorePlay = async (t, zone, { makeVowKit }) => { + const makeLogStore = prepareLogStore(zone); + + const log = zone.makeOnce('log', () => makeLogStore()); + const v1 = zone.makeOnce('v1', () => makeVowKit().vow); + + t.is(log.getIndex(), 0); + t.is(log.getLength(), 0); + t.throws(() => log.pushEntry(['bogus']), { + message: + /^In "pushEntry" method of \(LogStore\): arg 0: \["bogus"\] - Must match one of/, + }); + t.false(log.isReplaying()); + t.is(await log.promiseReplayDone(), undefined); + + t.is(log.pushEntry(harden(['doFulfill', v1, 'x'])), 1); + t.is(log.pushEntry(harden(['doReject', v1, 'x'])), 2); + t.deepEqual(log.dump(), [ + ['doFulfill', v1, 'x'], + ['doReject', v1, 'x'], + ]); + // Because t.deepEqual is too tolerant + t.is(vowishKey(log.dump()[0][1]), vowishKey(v1)); + t.is(vowishKey(log.dump()[1][1]), vowishKey(v1)); + + t.is(log.getIndex(), 2); + t.is(log.getLength(), 2); + t.false(log.isReplaying()); + t.is(await log.promiseReplayDone(), undefined); +}; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} _vowTools + */ +const testLogStoreReplay = async (t, zone, _vowTools) => { + prepareLogStore(zone); + + const log = + /** @type {import('../../src/async-flow/log-store.js').LogStore} */ ( + zone.makeOnce('log', () => Fail`log expected`) + ); + const v1 = /** @type {Vow} */ (zone.makeOnce('v1', () => Fail`v1 expected`)); + + t.is(log.getIndex(), 0); + t.is(log.getLength(), 2); + t.true(log.isReplaying()); + + t.deepEqual(log.dump(), [ + ['doFulfill', v1, 'x'], + ['doReject', v1, 'x'], + ]); + // Because t.deepEqual is too tolerant + t.is(vowishKey(log.dump()[0][1]), vowishKey(v1)); + t.is(vowishKey(log.dump()[1][1]), vowishKey(v1)); + + t.deepEqual(log.nextEntry(), ['doFulfill', v1, 'x']); + t.deepEqual(log.nextEntry(), ['doReject', v1, 'x']); + t.is(log.getIndex(), 2); + t.false(log.isReplaying()); + t.is(await log.promiseReplayDone(), undefined); +}; + +await test('test heap log-store', async t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + return testLogStorePlay(t, zone, vowTools); +}); + +await test.serial('test virtual log-store', async t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + return testLogStorePlay(t, zone, vowTools); +}); + +await test.serial('test durable log-store', async t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + await testLogStorePlay(t, zone1, vowTools1); + + nextLife(); + const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools2 = prepareWatchableVowTools(zone2); + return testLogStoreReplay(t, zone2, vowTools2); +}); diff --git a/packages/zone/test/async-flow/test-replay-membrane.js b/packages/zone/test/async-flow/test-replay-membrane.js new file mode 100644 index 00000000000..ecbdac29c01 --- /dev/null +++ b/packages/zone/test/async-flow/test-replay-membrane.js @@ -0,0 +1,284 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, + asyncFlowVerbose, +} from '../prepare-test-env-ava.js'; + +import { Fail } from '@endo/errors'; +// import { E } from '@endo/far'; +// import E from '@agoric/vow/src/E.js'; +import { isPromise } from '@endo/promise-kit'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { prepareLogStore } from '../../src/async-flow/log-store.js'; +import { prepareWeakBijection } from '../../src/async-flow/weak-bijection.js'; +import { makeReplayMembrane } from '../../src/async-flow/replay-membrane.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +const watchWake = _vowish => {}; +const panic = problem => Fail`panic over ${problem}`; + +/** + * @param {import('@agoric/base-zone').Zone} zone + * @param {number} [k] + */ +const prepareOrchestra = (zone, k = 1) => + zone.exoClass( + 'Orchestra', + undefined, + (factor, vow, resolver) => ({ factor, vow, resolver }), + { + scale(n) { + const { state } = this; + return k * state.factor * n; + }, + vow() { + const { state } = this; + return state.vow; + }, + resolve(x) { + const { state } = this; + state.resolver.resolve(x); + }, + }, + ); + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + * @param {boolean} [showOnConsole] + */ +const testFirstPlay = async (t, zone, vowTools, showOnConsole = false) => { + const makeLogStore = prepareLogStore(zone); + const makeBijection = prepareWeakBijection(zone); + const makeOrchestra = prepareOrchestra(zone); + const { makeVowKit } = vowTools; + const { vow: v1, resolver: r1 } = makeVowKit(); + const { vow: v2, resolver: r2 } = makeVowKit(); + + const log = zone.makeOnce('log', () => makeLogStore()); + const bij = zone.makeOnce('bij', makeBijection); + + const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const gP = mem.hostToGuest(v1); + t.true(isPromise(gP)); + r1.resolve('x'); + t.is(await gP, 'x'); + + const hOrch7 = makeOrchestra(7, v2, r2); + t.false(bij.hasHost(hOrch7)); + const gOrch7 = mem.hostToGuest(hOrch7); + t.true(bij.has(gOrch7, hOrch7)); + + const prod = gOrch7.scale(3); + t.is(prod, 21); + + let gErr; + try { + gOrch7.scale(9n); + } catch (e) { + gErr = e; + } + + // TODO make E work across the membrane *well* + // TODO also try E on remote promise + // const prodP = E(gOrch7).scale(33); + // t.is(await prodP, 231); + // const badP = E(gOrch7).scale(99n); + // let gErr1; + // try { + // await badP; + // } catch (e) { + // gErr1 = e; + // } + // t.is(gErr1.name, 'TypeError'); + + t.deepEqual(log.dump(), [ + ['doFulfill', v1, 'x'], + ['checkCall', hOrch7, 'scale', [3], 1], + ['doReturn', 1, 21], + ['checkCall', hOrch7, 'scale', [9n], 3], + ['doThrow', 3, mem.guestToHost(gErr)], + ]); + + if (showOnConsole) { + // To see the annotation chain. Once we're synced with the next ses-ava, + // change this to a t.log, so we will see the annotation chain in context. + console.log('gErr', gErr); + } +}; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + */ +const testBadReplay = async (t, zone, vowTools) => { + prepareLogStore(zone); + prepareWeakBijection(zone); + prepareOrchestra(zone); + + const log = + /** @type {import('../../src/async-flow/log-store.js').LogStore} */ ( + zone.makeOnce('log', () => Fail`log expected`) + ); + const bij = + /** @type {import('../../src/async-flow/weak-bijection.js').WeakBijection} */ ( + zone.makeOnce('bij', () => Fail`bij expected`) + ); + + const dump = log.dump(); + const v1 = dump[0][1]; + const hOrch7 = dump[1][1]; + const hErr = dump[4][2]; + + t.false(bij.hasHost(hOrch7)); + + t.deepEqual(dump, [ + ['doFulfill', v1, 'x'], + ['checkCall', hOrch7, 'scale', [3], 1], + ['doReturn', 1, 21], + ['checkCall', hOrch7, 'scale', [9n], 3], + ['doThrow', 3, hErr], + ]); + + const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const g1 = mem.hostToGuest(v1); + mem.replayReady(); + t.is(await g1, 'x'); + const gOrch7 = mem.hostToGuest(hOrch7); + t.true(bij.has(gOrch7, hOrch7)); + + // failure of guest to reproduce behavior from previous incarnations + t.throws(() => gOrch7.scale(4), { + message: + 'panic over "[Error: replay 1: [3]: [0]: unequal primitives 4 vs 3]"', + }); +}; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + */ +const testGoodReplay = async (t, zone, vowTools) => { + prepareLogStore(zone); + prepareWeakBijection(zone); + prepareOrchestra(zone, 2); // 2 is new incarnation behavior change + + const log = + /** @type {import('../../src/async-flow/log-store.js').LogStore} */ ( + zone.makeOnce('log', () => Fail`log expected`) + ); + const bij = + /** @type {import('../../src/async-flow/weak-bijection.js').WeakBijection} */ ( + zone.makeOnce('bij', () => Fail`bij expected`) + ); + + const dump = log.dump(); + const v1 = dump[0][1]; + const hOrch7 = dump[1][1]; + const hErr = dump[4][2]; + + t.false(bij.hasHost(hOrch7)); + + t.deepEqual(dump, [ + ['doFulfill', v1, 'x'], + ['checkCall', hOrch7, 'scale', [3], 1], + ['doReturn', 1, 21], + ['checkCall', hOrch7, 'scale', [9n], 3], + ['doThrow', 3, hErr], + ]); + + const oldLogLen = dump.length; + + const mem = makeReplayMembrane(log, bij, vowTools, watchWake, panic); + + const g1 = mem.hostToGuest(v1); + mem.replayReady(); + t.is(await g1, 'x'); + const gOrch7 = mem.hostToGuest(hOrch7); + t.true(bij.has(gOrch7, hOrch7)); + + // replay + const prodA = gOrch7.scale(3); + t.is(prodA, 21); // According to log of earlier incarnations + // let gErr; + try { + gOrch7.scale(9n); + } catch (e) { + // gErr = e; + } + + // new play + const prodB = gOrch7.scale(3); + t.is(prodB, 42); // According to new incarnation behavior + + const g2 = gOrch7.vow(); + const h2 = mem.guestToHost(g2); + t.true(isPromise(g2)); + const pairA = [gOrch7, g1]; + gOrch7.resolve(pairA); + const pairB = await g2; + const [gOrchB, gB] = pairB; + t.not(pairB, pairA); + t.is(gOrchB, gOrch7); + t.is(gB, g1); + + t.deepEqual(log.dump(), [ + ['doFulfill', v1, 'x'], + ['checkCall', hOrch7, 'scale', [3], 1], + ['doReturn', 1, 21], + ['checkCall', hOrch7, 'scale', [9n], 3], + ['doThrow', 3, hErr], + + ['checkCall', hOrch7, 'scale', [3], oldLogLen], + ['doReturn', oldLogLen, 42], + ['checkCall', hOrch7, 'vow', [], oldLogLen + 2], + ['doReturn', oldLogLen + 2, h2], + ['checkCall', hOrch7, 'resolve', [[hOrch7, v1]], oldLogLen + 4], + ['doReturn', oldLogLen + 4, undefined], + ['doFulfill', h2, [hOrch7, v1]], + ]); +}; + +await test.serial('test heap replay-membrane', async t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + return testFirstPlay(t, zone, vowTools, asyncFlowVerbose()); +}); + +await test.serial('test virtual replay-membrane', async t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + return testFirstPlay(t, zone, vowTools); +}); + +await test.serial('test durable replay-membrane', async t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + await testFirstPlay(t, zone1, vowTools1); + + nextLife(); + const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools2 = prepareWatchableVowTools(zone2); + await testBadReplay(t, zone2, vowTools2); + + nextLife(); + const zone3 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools3 = prepareWatchableVowTools(zone3); + return testGoodReplay(t, zone3, vowTools3); +}); diff --git a/packages/zone/test/async-flow/test-weak-bijection.js b/packages/zone/test/async-flow/test-weak-bijection.js new file mode 100644 index 00000000000..beff27766cd --- /dev/null +++ b/packages/zone/test/async-flow/test-weak-bijection.js @@ -0,0 +1,88 @@ +// eslint-disable-next-line import/order +import { + test, + getBaggage, + annihilate, + nextLife, +} from '../prepare-test-env-ava.js'; + +import { Far } from '@endo/pass-style'; +import { prepareVowTools } from '@agoric/vow'; +import { prepareVowTools as prepareWatchableVowTools } from '@agoric/vat-data/vow.js'; +import { isVow } from '@agoric/vow/src/vow-utils.js'; +import { prepareWeakBijection } from '../../src/async-flow/weak-bijection.js'; + +import { makeHeapZone } from '../../heap.js'; +import { makeVirtualZone } from '../../virtual.js'; +import { makeDurableZone } from '../../durable.js'; + +/** + * @param {any} t + * @param {import('@agoric/base-zone').Zone} zone + * @param {import('@agoric/vow').VowTools} vowTools + */ +const testBijection = (t, zone, { makeVowKit }) => { + const makeBijection = prepareWeakBijection(zone); + const bij = zone.makeOnce('bij', makeBijection); + + const h1 = zone.exo('h1', undefined, {}); + const h2 = zone.exo('h2', undefined, {}); + const h3 = zone.makeOnce('h3', () => makeVowKit().vow); + t.true(isVow(h3)); + + const g1 = Far('g1', {}); + const g2 = Far('g2', {}); + const g3 = harden(Promise.resolve('g3')); + + t.false(bij.has(g1, h1)); + bij.define(g1, h1); + t.true(bij.has(g1, h1)); + t.throws(() => bij.define(g1, h2), { + message: + 'internal: g->h "[Alleged: g1]" -> "[Alleged: h2]" vs "[Alleged: h1]"', + }); + t.throws(() => bij.define(g2, h1), { + message: 'key not found: "[Alleged: h1]"', + }); + t.throws(() => bij.has(g1, h2), { + message: + 'internal: g->h "[Alleged: g1]" -> "[Alleged: h2]" vs "[Alleged: h1]"', + }); + t.false(bij.has(g2, h2)); + bij.define(g2, h2); + t.true(bij.has(g2, h2)); + + t.false(bij.has(g3, h3)); + bij.init(g3, h3); + t.true(bij.has(g3, h3)); + t.false(bij.has(h3, g3)); +}; + +test('test heap bijection', t => { + const zone = makeHeapZone('heapRoot'); + const vowTools = prepareVowTools(zone); + testBijection(t, zone, vowTools); +}); + +test.serial('test virtual bijection', t => { + annihilate(); + const zone = makeVirtualZone('virtualRoot'); + const vowTools = prepareVowTools(zone); + testBijection(t, zone, vowTools); +}); + +test.serial('test durable bijection', t => { + annihilate(); + + nextLife(); + const zone1 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools1 = prepareWatchableVowTools(zone1); + testBijection(t, zone1, vowTools1); + + // Bijections persist but revive empty since all the guests disappear anyway + + nextLife(); + const zone2 = makeDurableZone(getBaggage(), 'durableRoot'); + const vowTools2 = prepareWatchableVowTools(zone2); + testBijection(t, zone2, vowTools2); +}); diff --git a/packages/zone/test/prepare-test-env-ava.js b/packages/zone/test/prepare-test-env-ava.js index a3d049993ce..ba79b876f39 100644 --- a/packages/zone/test/prepare-test-env-ava.js +++ b/packages/zone/test/prepare-test-env-ava.js @@ -1,9 +1,11 @@ import '@agoric/swingset-liveslots/tools/prepare-test-env.js'; -import { reincarnate } from '@agoric/swingset-liveslots/tools/setup-vat-data.js'; +import { wrapTest } from '@endo/ses-ava'; +import rawTest from 'ava'; -import test from 'ava'; +import { environmentOptionsListHas } from '@endo/env-options'; +import { reincarnate } from '@agoric/swingset-liveslots/tools/setup-vat-data.js'; -export { test }; +export const test = wrapTest(rawTest); /** @type {ReturnType} */ let incarnation; @@ -19,3 +21,10 @@ export const getBaggage = () => { export const nextLife = () => { incarnation = reincarnate(incarnation); }; + +export const asyncFlowVerbose = () => { + // TODO figure out how we really want to control this + // But keep in mind that all this async-flow stuff will migrate + // into its own package + return environmentOptionsListHas('DEBUG', 'async-flow-verbose'); +}; diff --git a/packages/zone/zone-helpers.js b/packages/zone/zone-helpers.js new file mode 100644 index 00000000000..167bc8a1820 --- /dev/null +++ b/packages/zone/zone-helpers.js @@ -0,0 +1,2 @@ +export * from '@agoric/base-zone/zone-helpers.js'; +export * from './src/async-flow/async-flow.js';