From 3318e5b6a906202411ea983b3c6285067a0cd76f Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 18 Jul 2021 23:23:56 -0700 Subject: [PATCH] fix(swingset): don't deduplicate inbound mailbox messages The mailbox device tracking the highest inbound message number and ack for each peer, to de-duplicate repeated messages, so it could reduce the amount of kernel activity. Each call to `deliverInbound` would return a boolean to indicate whether the messages/ack were new, and thus the kernel needed to be cycled. However, the device was holding this tracking data in non-durable state, so if/when the kernel was restarted, the state would be lost. A duplicate message/ack arriving in the restarted process would trigger kernel activity that would not have run in the original process. These extra cranks caused diverge between validators when one of them was restarted, and the client sent a duplicate message (such as the pre-emptive `ack` all clients send at startup). The extra crank does not get very far, because vattp does its own deduplication, so the divergence was only visible in the slog. But when #3442 is implemented, even a single extra crank will flag the validator as out of consensus. The fix is to remove the mailbox device's dedup code, and rely upon vattp for this function. The test was also updated to match, and a new test (comparing two parallel kernels, one restarted, one not) was added. closes #3471 --- packages/SwingSet/src/devices/mailbox-src.js | 43 +--- .../device-mailbox/test-device-mailbox.js | 218 ++++++++++-------- 2 files changed, 136 insertions(+), 125 deletions(-) diff --git a/packages/SwingSet/src/devices/mailbox-src.js b/packages/SwingSet/src/devices/mailbox-src.js index 69c1a3421402..0cebf9760833 100644 --- a/packages/SwingSet/src/devices/mailbox-src.js +++ b/packages/SwingSet/src/devices/mailbox-src.js @@ -5,50 +5,27 @@ import { assert, details as X } from '@agoric/assert'; export function buildRootDeviceNode(tools) { const { SO, getDeviceState, setDeviceState, endowments } = tools; - const highestInboundDelivered = harden(new Map()); - const highestInboundAck = harden(new Map()); let deliverInboundMessages; let deliverInboundAck; - function inboundCallback(hPeer, hMessages, hAck) { - const peer = `${hPeer}`; + function inboundCallback(peer, messages, ack) { if (!deliverInboundMessages) { throw new Error( `mailbox.inboundCallback(${peer}) called before handler was registered`, ); } - const ack = Nat(hAck); - let didSomething = false; - - let latestMsg = 0; - if (highestInboundDelivered.has(peer)) { - latestMsg = highestInboundDelivered.get(peer); - } - const newMessages = []; - hMessages.forEach(m => { - const [hNum, hMsg] = m; - const num = Nat(hNum); - if (num > latestMsg) { - newMessages.push([num, `${hMsg}`]); - latestMsg = num; - highestInboundDelivered.set(peer, latestMsg); - } + assert.typeof(peer, 'string'); + messages.forEach(m => { + Nat(m[0]); + assert.typeof(m[1], 'string'); }); - if (newMessages.length) { - deliverInboundMessages(peer, harden(newMessages)); - didSomething = true; - } - let latestAck = 0; - if (highestInboundAck.has(peer)) { - latestAck = highestInboundAck.get(peer); - } - if (ack > latestAck) { - highestInboundAck.set(peer, ack); - deliverInboundAck(peer, ack); - didSomething = true; + Nat(ack); + if (messages.length) { + deliverInboundMessages(peer, harden(messages)); } - return didSomething; + deliverInboundAck(peer, ack); + return true; // always didSomething } endowments.registerInboundCallback(inboundCallback); diff --git a/packages/SwingSet/test/device-mailbox/test-device-mailbox.js b/packages/SwingSet/test/device-mailbox/test-device-mailbox.js index c278b89cff67..239f7e3889b9 100644 --- a/packages/SwingSet/test/device-mailbox/test-device-mailbox.js +++ b/packages/SwingSet/test/device-mailbox/test-device-mailbox.js @@ -6,7 +6,6 @@ import path from 'path'; import bundleSource from '@agoric/bundle-source'; import { getAllState } from '@agoric/swing-store-simple'; import { provideHostStorage } from '../../src/hostStorage.js'; - import { initializeSwingset, makeSwingsetController, @@ -16,6 +15,7 @@ import { buildMailboxStateMap, buildMailbox, } from '../../src/devices/mailbox.js'; +import { capargs } from '../util.js'; test.before(async t => { const kernelBundles = await buildKernelBundles(); @@ -92,106 +92,140 @@ test('mailbox inbound', async t => { mailbox: { ...mb.endowments }, }; - let rc; - const hostStorage = provideHostStorage(); await initializeSwingset(config, ['mailbox2'], hostStorage, t.context.data); const c = await makeSwingsetController(hostStorage, deviceEndowments); await c.run(); - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 0, - ); - t.truthy(rc); + const m1 = [1, 'msg1']; + const m2 = [2, 'msg2']; + const m3 = [3, 'msg3']; + t.true(mb.deliverInbound('peer1', [m1, m2], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2']); - - // delivering the same messages should not trigger sends, but the ack is new - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 3, - ); - t.truthy(rc); + const expected = ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']; + t.deepEqual(c.dump().log, expected); + + // all messages/acks should be delivered, even duplicates + t.true(mb.deliverInbound('peer1', [m1, m2], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']); - - // no new messages/acks makes deliverInbound return 'false' - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - ], - 3, - ); - t.falsy(rc); + expected.push(...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']); + t.deepEqual(c.dump().log, expected); + + // new messages too + t.true(mb.deliverInbound('peer1', [m1, m2, m3], 0)); await c.run(); - t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']); - - // but new messages should be sent - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - [3, 'msg3'], - ], - 3, + expected.push( + ...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-0'], ); - t.truthy(rc); + t.deepEqual(c.dump().log, expected); + + // and new ack + t.true(mb.deliverInbound('peer1', [m1, m2, m3], 6)); await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - ]); - - // and a higher ack should be sent - rc = mb.deliverInbound( - 'peer1', - [ - [1, 'msg1'], - [2, 'msg2'], - [3, 'msg3'], - ], - 4, + expected.push( + ...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-6'], ); - t.truthy(rc); - await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - 'da-peer1-4', - ]); - - rc = mb.deliverInbound('peer2', [[4, 'msg4']], 5); - t.truthy(rc); + t.deepEqual(c.dump().log, expected); +}); + +async function initializeMailboxKernel(t) { + const s = buildMailboxStateMap(); + const mb = buildMailbox(s); + const config = { + bootstrap: 'bootstrap', + vats: { + bootstrap: { + bundle: t.context.data.bootstrap, + }, + }, + devices: { + mailbox: { + sourceSpec: require.resolve(mb.srcPath), + }, + }, + }; + const hostStorage = provideHostStorage(); + await initializeSwingset( + config, + ['mailbox-determinism'], + hostStorage, + t.context.data, + ); + return hostStorage; +} + +async function makeMailboxKernel(hostStorage) { + const s = buildMailboxStateMap(); + const mb = buildMailbox(s); + const deviceEndowments = { + mailbox: { ...mb.endowments }, + }; + const c = await makeSwingsetController(hostStorage, deviceEndowments); + c.pinVatRoot('bootstrap'); await c.run(); - t.deepEqual(c.dump().log, [ - 'dm-peer1', - 'm-1-msg1', - 'm-2-msg2', - 'da-peer1-3', - 'dm-peer1', - 'm-3-msg3', - 'da-peer1-4', - 'dm-peer2', - 'm-4-msg4', - 'da-peer2-5', - ]); + return [c, mb]; +} + +test('mailbox determinism', async t => { + // we run two kernels in parallel + const hostStorage1 = await initializeMailboxKernel(t); + const hostStorage2 = await initializeMailboxKernel(t); + const [c1a, mb1a] = await makeMailboxKernel(hostStorage1); + const [c2, mb2] = await makeMailboxKernel(hostStorage2); + + // they get the same inbound message + const msg1 = [[1, 'msg1']]; + t.true(mb1a.deliverInbound('peer1', msg1, 0)); + await c1a.run(); + t.deepEqual(c1a.dump().log, ['comms receive msg1']); + const kp1 = c1a.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c1a.run(); + t.deepEqual(JSON.parse(c1a.kpResolution(kp1).body), 1); + + t.true(mb2.deliverInbound('peer1', msg1, 0)); + await c2.run(); + t.deepEqual(c2.dump().log, ['comms receive msg1']); + const kp2 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c2.run(); + t.deepEqual(JSON.parse(c2.kpResolution(kp1).body), 1); + + // both should have the same number of cranks + t.is( + hostStorage1.kvStore.get('crankNumber'), + hostStorage2.kvStore.get('crankNumber'), + ); + + // then one is restarted, but the other keeps running + const [c1b, mb1b] = await makeMailboxKernel(hostStorage1); + + // Now we repeat delivery of that message to both. The mailbox should send + // it to vattp, even though it's duplicate, because the mailbox doesn't + // have durable state, and cannot correctly (deterministically) tell that + // it's a duplicate. + t.true(mb1b.deliverInbound('peer1', msg1, 0)); + await c1b.run(); + // the testlog is part of the ephemeral kernel state, so it will only have + // a record of messages in the second run, however the vat is replayed + // during the second-run startup, so we expect to see one copy of the + // original message, delivered during the second run + t.deepEqual(c1b.dump().log, ['comms receive msg1']); + // but vattp dedups, so only one message should be delivered to comms + const kp3 = c1b.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c1b.run(); + t.deepEqual(JSON.parse(c1b.kpResolution(kp3).body), 1); + + t.true(mb2.deliverInbound('peer1', msg1, 0)); + await c2.run(); + // the second kernel still has that ephemeral testlog, however the vat is + // still running, so we only see the original message from the first run + t.deepEqual(c2.dump().log, ['comms receive msg1']); + const kp4 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([])); + await c2.run(); + t.deepEqual(JSON.parse(c2.kpResolution(kp4).body), 1); + + // Both should *still* have the same number of cranks. This is what bug + // #3471 exposed. + t.is( + hostStorage1.kvStore.get('crankNumber'), + hostStorage2.kvStore.get('crankNumber'), + ); });