Skip to content

Commit

Permalink
fix(swingset): don't deduplicate inbound mailbox messages
Browse files Browse the repository at this point in the history
The mailbox device tracking the highest inbound message number and ack for
each peer, to de-duplicate repeated messages, so it could reduce the amount
of kernel activity. Each call to `deliverInbound` would return a boolean to
indicate whether the messages/ack were new, and thus the kernel needed to be
cycled.

However, the device was holding this tracking data in non-durable state, so
if/when the kernel was restarted, the state would be lost. A duplicate
message/ack arriving in the restarted process would trigger kernel activity
that would not have run in the original process. These extra cranks caused
diverge between validators when one of them was restarted, and the client
sent a duplicate message (such as the pre-emptive `ack` all clients send at
startup). The extra crank does not get very far, because vattp does its own
deduplication, so the divergence was only visible in the slog. But when #3442
is implemented, even a single extra crank will flag the validator as out of
consensus.

The fix is to remove the mailbox device's dedup code, and rely upon vattp for
this function. The test was also updated to match, and a new test (comparing
two parallel kernels, one restarted, one not) was added.

closes #3471
  • Loading branch information
warner committed Jul 22, 2021
1 parent 72fb780 commit 3318e5b
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 125 deletions.
43 changes: 10 additions & 33 deletions packages/SwingSet/src/devices/mailbox-src.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,27 @@ import { assert, details as X } from '@agoric/assert';

export function buildRootDeviceNode(tools) {
const { SO, getDeviceState, setDeviceState, endowments } = tools;
const highestInboundDelivered = harden(new Map());
const highestInboundAck = harden(new Map());

let deliverInboundMessages;
let deliverInboundAck;

function inboundCallback(hPeer, hMessages, hAck) {
const peer = `${hPeer}`;
function inboundCallback(peer, messages, ack) {
if (!deliverInboundMessages) {
throw new Error(
`mailbox.inboundCallback(${peer}) called before handler was registered`,
);
}
const ack = Nat(hAck);
let didSomething = false;

let latestMsg = 0;
if (highestInboundDelivered.has(peer)) {
latestMsg = highestInboundDelivered.get(peer);
}
const newMessages = [];
hMessages.forEach(m => {
const [hNum, hMsg] = m;
const num = Nat(hNum);
if (num > latestMsg) {
newMessages.push([num, `${hMsg}`]);
latestMsg = num;
highestInboundDelivered.set(peer, latestMsg);
}
assert.typeof(peer, 'string');
messages.forEach(m => {
Nat(m[0]);
assert.typeof(m[1], 'string');
});
if (newMessages.length) {
deliverInboundMessages(peer, harden(newMessages));
didSomething = true;
}
let latestAck = 0;
if (highestInboundAck.has(peer)) {
latestAck = highestInboundAck.get(peer);
}
if (ack > latestAck) {
highestInboundAck.set(peer, ack);
deliverInboundAck(peer, ack);
didSomething = true;
Nat(ack);
if (messages.length) {
deliverInboundMessages(peer, harden(messages));
}
return didSomething;
deliverInboundAck(peer, ack);
return true; // always didSomething
}
endowments.registerInboundCallback(inboundCallback);

Expand Down
218 changes: 126 additions & 92 deletions packages/SwingSet/test/device-mailbox/test-device-mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import path from 'path';
import bundleSource from '@agoric/bundle-source';
import { getAllState } from '@agoric/swing-store-simple';
import { provideHostStorage } from '../../src/hostStorage.js';

import {
initializeSwingset,
makeSwingsetController,
Expand All @@ -16,6 +15,7 @@ import {
buildMailboxStateMap,
buildMailbox,
} from '../../src/devices/mailbox.js';
import { capargs } from '../util.js';

test.before(async t => {
const kernelBundles = await buildKernelBundles();
Expand Down Expand Up @@ -92,106 +92,140 @@ test('mailbox inbound', async t => {
mailbox: { ...mb.endowments },
};

let rc;

const hostStorage = provideHostStorage();
await initializeSwingset(config, ['mailbox2'], hostStorage, t.context.data);
const c = await makeSwingsetController(hostStorage, deviceEndowments);
await c.run();
rc = mb.deliverInbound(
'peer1',
[
[1, 'msg1'],
[2, 'msg2'],
],
0,
);
t.truthy(rc);
const m1 = [1, 'msg1'];
const m2 = [2, 'msg2'];
const m3 = [3, 'msg3'];
t.true(mb.deliverInbound('peer1', [m1, m2], 0));
await c.run();
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2']);

// delivering the same messages should not trigger sends, but the ack is new
rc = mb.deliverInbound(
'peer1',
[
[1, 'msg1'],
[2, 'msg2'],
],
3,
);
t.truthy(rc);
const expected = ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0'];
t.deepEqual(c.dump().log, expected);

// all messages/acks should be delivered, even duplicates
t.true(mb.deliverInbound('peer1', [m1, m2], 0));
await c.run();
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']);

// no new messages/acks makes deliverInbound return 'false'
rc = mb.deliverInbound(
'peer1',
[
[1, 'msg1'],
[2, 'msg2'],
],
3,
);
t.falsy(rc);
expected.push(...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']);
t.deepEqual(c.dump().log, expected);

// new messages too
t.true(mb.deliverInbound('peer1', [m1, m2, m3], 0));
await c.run();
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']);

// but new messages should be sent
rc = mb.deliverInbound(
'peer1',
[
[1, 'msg1'],
[2, 'msg2'],
[3, 'msg3'],
],
3,
expected.push(
...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-0'],
);
t.truthy(rc);
t.deepEqual(c.dump().log, expected);

// and new ack
t.true(mb.deliverInbound('peer1', [m1, m2, m3], 6));
await c.run();
t.deepEqual(c.dump().log, [
'dm-peer1',
'm-1-msg1',
'm-2-msg2',
'da-peer1-3',
'dm-peer1',
'm-3-msg3',
]);

// and a higher ack should be sent
rc = mb.deliverInbound(
'peer1',
[
[1, 'msg1'],
[2, 'msg2'],
[3, 'msg3'],
],
4,
expected.push(
...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-6'],
);
t.truthy(rc);
await c.run();
t.deepEqual(c.dump().log, [
'dm-peer1',
'm-1-msg1',
'm-2-msg2',
'da-peer1-3',
'dm-peer1',
'm-3-msg3',
'da-peer1-4',
]);

rc = mb.deliverInbound('peer2', [[4, 'msg4']], 5);
t.truthy(rc);
t.deepEqual(c.dump().log, expected);
});

async function initializeMailboxKernel(t) {
const s = buildMailboxStateMap();
const mb = buildMailbox(s);
const config = {
bootstrap: 'bootstrap',
vats: {
bootstrap: {
bundle: t.context.data.bootstrap,
},
},
devices: {
mailbox: {
sourceSpec: require.resolve(mb.srcPath),
},
},
};
const hostStorage = provideHostStorage();
await initializeSwingset(
config,
['mailbox-determinism'],
hostStorage,
t.context.data,
);
return hostStorage;
}

async function makeMailboxKernel(hostStorage) {
const s = buildMailboxStateMap();
const mb = buildMailbox(s);
const deviceEndowments = {
mailbox: { ...mb.endowments },
};
const c = await makeSwingsetController(hostStorage, deviceEndowments);
c.pinVatRoot('bootstrap');
await c.run();
t.deepEqual(c.dump().log, [
'dm-peer1',
'm-1-msg1',
'm-2-msg2',
'da-peer1-3',
'dm-peer1',
'm-3-msg3',
'da-peer1-4',
'dm-peer2',
'm-4-msg4',
'da-peer2-5',
]);
return [c, mb];
}

test('mailbox determinism', async t => {
// we run two kernels in parallel
const hostStorage1 = await initializeMailboxKernel(t);
const hostStorage2 = await initializeMailboxKernel(t);
const [c1a, mb1a] = await makeMailboxKernel(hostStorage1);
const [c2, mb2] = await makeMailboxKernel(hostStorage2);

// they get the same inbound message
const msg1 = [[1, 'msg1']];
t.true(mb1a.deliverInbound('peer1', msg1, 0));
await c1a.run();
t.deepEqual(c1a.dump().log, ['comms receive msg1']);
const kp1 = c1a.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
await c1a.run();
t.deepEqual(JSON.parse(c1a.kpResolution(kp1).body), 1);

t.true(mb2.deliverInbound('peer1', msg1, 0));
await c2.run();
t.deepEqual(c2.dump().log, ['comms receive msg1']);
const kp2 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
await c2.run();
t.deepEqual(JSON.parse(c2.kpResolution(kp1).body), 1);

// both should have the same number of cranks
t.is(
hostStorage1.kvStore.get('crankNumber'),
hostStorage2.kvStore.get('crankNumber'),
);

// then one is restarted, but the other keeps running
const [c1b, mb1b] = await makeMailboxKernel(hostStorage1);

// Now we repeat delivery of that message to both. The mailbox should send
// it to vattp, even though it's duplicate, because the mailbox doesn't
// have durable state, and cannot correctly (deterministically) tell that
// it's a duplicate.
t.true(mb1b.deliverInbound('peer1', msg1, 0));
await c1b.run();
// the testlog is part of the ephemeral kernel state, so it will only have
// a record of messages in the second run, however the vat is replayed
// during the second-run startup, so we expect to see one copy of the
// original message, delivered during the second run
t.deepEqual(c1b.dump().log, ['comms receive msg1']);
// but vattp dedups, so only one message should be delivered to comms
const kp3 = c1b.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
await c1b.run();
t.deepEqual(JSON.parse(c1b.kpResolution(kp3).body), 1);

t.true(mb2.deliverInbound('peer1', msg1, 0));
await c2.run();
// the second kernel still has that ephemeral testlog, however the vat is
// still running, so we only see the original message from the first run
t.deepEqual(c2.dump().log, ['comms receive msg1']);
const kp4 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
await c2.run();
t.deepEqual(JSON.parse(c2.kpResolution(kp4).body), 1);

// Both should *still* have the same number of cranks. This is what bug
// #3471 exposed.
t.is(
hostStorage1.kvStore.get('crankNumber'),
hostStorage2.kvStore.get('crankNumber'),
);
});

0 comments on commit 3318e5b

Please sign in to comment.