Skip to content

Commit

Permalink
fix: Prevent race conditions around data pulled from L1 (#2577)
Browse files Browse the repository at this point in the history
This PR fixes race conditions in the retrieval of data from L1,
particularly around the retrieval of L1 to L2 messages.

# Checklist:
Remove the checklist to signal you've completed it. Enable auto-merge if
the PR is ready to merge.
- [ ] If the pull request requires a cryptography review (e.g.
cryptographic algorithm implementations) I have added the 'crypto' tag.
- [ ] I have reviewed my diff in github, line by line and removed
unexpected formatting changes, testing logs, or commented-out code.
- [ ] Every change is related to the PR description.
- [ ] I have
[linked](https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue)
this pull request to relevant issues (if any exist).
  • Loading branch information
PhilWindle authored Sep 28, 2023
1 parent 314e8a0 commit defea83
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 29 deletions.
91 changes: 84 additions & 7 deletions yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,27 @@ describe('Archiver', () => {
blocks[1].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(
1000n,
2501n,
blocks[2].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(102n, [
makeL1ToL2MessageAddedEvents(2502n, [
messageToCancel1,
messageToCancel2,
messageToStayPending1,
messageToStayPending2,
]),
];
publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2501n).mockResolvedValueOnce(2502n);
publicClient.getBlockNumber.mockResolvedValueOnce(2500n).mockResolvedValueOnce(2600n).mockResolvedValueOnce(2700n);
// logs should be created in order of how archiver syncs.
publicClient.getLogs
.mockResolvedValueOnce(l1ToL2MessageAddedEvents.slice(0, 2).flat())
.mockResolvedValueOnce([]) // no messages to cancel
.mockResolvedValueOnce([makeL2BlockProcessedEvent(101n, 1n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(103n, blocks[0])])
.mockResolvedValueOnce([makeContractDeploymentEvent(103n, blocks[0])]) // the first loop of the archiver ends here at block 2500
.mockResolvedValueOnce(l1ToL2MessageAddedEvents.slice(2, 4).flat())
.mockResolvedValueOnce(makeL1ToL2MessageCancelledEvents(1100n, l1ToL2MessagesToCancel))
.mockResolvedValueOnce([makeL2BlockProcessedEvent(1101n, 2n), makeL2BlockProcessedEvent(1150n, 3n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(1102n, blocks[1])])
.mockResolvedValueOnce(makeL1ToL2MessageCancelledEvents(2503n, l1ToL2MessagesToCancel))
.mockResolvedValueOnce([makeL2BlockProcessedEvent(2510n, 2n), makeL2BlockProcessedEvent(2520n, 3n)])
.mockResolvedValueOnce([makeContractDeploymentEvent(2540n, blocks[1])])
.mockResolvedValue([]);
rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

Expand Down Expand Up @@ -127,6 +127,83 @@ describe('Archiver', () => {

await archiver.stop();
}, 10_000);

it('does not sync past current block number', async () => {
const numL2BlocksInTest = 2;
const archiver = new Archiver(
publicClient,
EthAddress.fromString(rollupAddress),
EthAddress.fromString(inboxAddress),
EthAddress.fromString(registryAddress),
EthAddress.fromString(contractDeploymentEmitterAddress),
0,
archiverStore,
1000,
);

let latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(0);

const createL1ToL2Messages = () => {
return [Fr.random().toString(true), Fr.random().toString(true)];
};

const blocks = blockNums.map(x => L2Block.random(x, 4, x, x + 1, x * 2, x * 3));
const rollupTxs = blocks.map(makeRollupTx);
// `L2Block.random(x)` creates some l1 to l2 messages. We add those,
// since it is expected by the test that these would be consumed.
// Archiver removes such messages from pending store.
// Also create some more messages to cancel and some that will stay pending.

const additionalL1ToL2MessagesBlock102 = createL1ToL2Messages();
const additionalL1ToL2MessagesBlock103 = createL1ToL2Messages();

const l1ToL2MessageAddedEvents = [
makeL1ToL2MessageAddedEvents(
100n,
blocks[0].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(
101n,
blocks[1].newL1ToL2Messages.map(key => key.toString(true)),
),
makeL1ToL2MessageAddedEvents(102n, additionalL1ToL2MessagesBlock102),
makeL1ToL2MessageAddedEvents(103n, additionalL1ToL2MessagesBlock103),
];

// Here we set the current L1 block number to 102. L1 to L2 messages after this should not be read.
publicClient.getBlockNumber.mockResolvedValue(102n);
// add all of the L1 to L2 messages to the mock
publicClient.getLogs
.mockImplementationOnce((args?: any) => {
return Promise.resolve(
l1ToL2MessageAddedEvents
.flat()
.filter(x => x.blockNumber! >= args.fromBlock && x.blockNumber! < args.toBlock),
);
})
.mockResolvedValueOnce([])
.mockResolvedValueOnce([makeL2BlockProcessedEvent(70n, 1n), makeL2BlockProcessedEvent(80n, 2n)])
.mockResolvedValue([]);
rollupTxs.slice(0, numL2BlocksInTest).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx));

await archiver.start(false);

// Wait until block 3 is processed. If this won't happen the test will fail with timeout.
while ((await archiver.getBlockNumber()) !== numL2BlocksInTest) {
await sleep(100);
}

latestBlockNum = await archiver.getBlockNumber();
expect(latestBlockNum).toEqual(numL2BlocksInTest);

// Check that the only pending L1 to L2 messages are those from eth bock 102
const expectedPendingMessageKeys = additionalL1ToL2MessagesBlock102;
const actualPendingMessageKeys = (await archiver.getPendingL1ToL2Messages(100)).map(key => key.toString(true));
expect(actualPendingMessageKeys).toEqual(expectedPendingMessageKeys);

await archiver.stop();
}, 10_000);
});

/**
Expand Down
35 changes: 31 additions & 4 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,49 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
return;
}

// ********** Ensuring Consistency of data pulled from L1 **********

/**
* There are a number of calls in this sync operation to L1 for retrieving
* events and transaction data. There are a couple of things we need to bear in mind
* to ensure that data is read exactly once.
*
* The first is the problem of eventually consistent ETH service providers like Infura.
* We are not currently handling this correctly in the case of L1 to L2 messages and we will
* want to re-visit L2 Block and contract data retrieval at a later stage. This is not
* currently a problem but will need to be addressed before a mainnet release.
*
* The second is that in between the various calls to L1, the block number can move meaning some
* of the following calls will return data for blocks that were not present during earlier calls.
* This is a problem for example when setting the last block number marker for L1 to L2 messages -
* this.lastProcessedBlockNumber = currentBlockNumber;
* It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time
* we do this sync we get the same message again. Addtionally, the call to get cancelled L1 to L2 messages
* could read from a block not present when retrieving pending messages. If a message was added and cancelled
* in the same eth block then we could try and cancel a non-existent pending message.
*
* To combat this for the time being we simply ensure that all data retrieval methods only retrieve
* data up to the currentBlockNumber captured at the top of this function. We might want to improve on this
* in future but for the time being it should give us the guarantees that we need
*
*/

// ********** Events that are processed in between blocks **********

// Process l1ToL2Messages, these are consumed as time passes, not each block
const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
currentBlockNumber,
this.lastProcessedBlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block
currentBlockNumber,
);
const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages(
this.publicClient,
this.inboxAddress,
blockUntilSynced,
currentBlockNumber,
this.lastProcessedBlockNumber + 1n,
currentBlockNumber,
);

// TODO (#717): optimise this - there could be messages in confirmed that are also in pending.
Expand All @@ -188,8 +215,8 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.rollupAddress,
blockUntilSynced,
currentBlockNumber,
this.nextL2BlockFromBlock,
currentBlockNumber,
nextExpectedL2BlockNum,
);

Expand All @@ -202,8 +229,8 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
this.publicClient,
this.contractDeploymentEmitterAddress,
blockUntilSynced,
currentBlockNumber,
this.nextL2BlockFromBlock,
currentBlockNumber,
blockHashMapping,
);
if (retrievedBlocks.retrievedData.length === 0) {
Expand Down
48 changes: 30 additions & 18 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,30 @@ type DataRetrieval<T> = {
* @param publicClient - The viem public client to use for transaction retrieval.
* @param rollupAddress - The address of the rollup contract.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentL1BlockNum - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @param expectedNextL2BlockNum - The next L2 block number that we expect to find.
* @returns An array of L2 Blocks and the next eth block to search from
*/
export async function retrieveBlocks(
publicClient: PublicClient,
rollupAddress: EthAddress,
blockUntilSynced: boolean,
currentL1BlockNum: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
expectedNextL2BlockNum: bigint,
): Promise<DataRetrieval<L2Block>> {
const retrievedBlocks: L2Block[] = [];
do {
if (searchStartBlock > currentL1BlockNum) {
if (searchStartBlock > searchEndBlock) {
break;
}
const l2BlockProcessedLogs = await getL2BlockProcessedLogs(publicClient, rollupAddress, searchStartBlock);
const l2BlockProcessedLogs = await getL2BlockProcessedLogs(
publicClient,
rollupAddress,
searchStartBlock,
searchEndBlock,
);
if (l2BlockProcessedLogs.length === 0) {
break;
}
Expand All @@ -61,7 +66,7 @@ export async function retrieveBlocks(
retrievedBlocks.push(...newBlocks);
searchStartBlock = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1].blockNumber! + 1n;
expectedNextL2BlockNum += BigInt(newBlocks.length);
} while (blockUntilSynced && searchStartBlock <= currentL1BlockNum);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedBlocks };
}

Expand All @@ -70,36 +75,37 @@ export async function retrieveBlocks(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param contractDeploymentEmitterAddress - The address of the contract deployment emitter contract.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @param blockHashMapping - A mapping from block number to relevant block hash.
* @returns An array of ExtendedContractData and their equivalent L2 Block number along with the next eth block to search from..
*/
export async function retrieveNewContractData(
publicClient: PublicClient,
contractDeploymentEmitterAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
blockHashMapping: { [key: number]: Buffer | undefined },
): Promise<DataRetrieval<[ExtendedContractData[], number]>> {
let retrievedNewContracts: [ExtendedContractData[], number][] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const contractDataLogs = await getContractDeploymentLogs(
publicClient,
contractDeploymentEmitterAddress,
searchStartBlock,
searchEndBlock,
);
if (contractDataLogs.length === 0) {
break;
}
const newContracts = processContractDeploymentLogs(blockHashMapping, contractDataLogs);
retrievedNewContracts = retrievedNewContracts.concat(newContracts);
searchStartBlock = (contractDataLogs.findLast(cd => !!cd)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewContracts };
}

Expand All @@ -108,28 +114,33 @@ export async function retrieveNewContractData(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param inboxAddress - The address of the inbox contract to fetch messages from.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns An array of L1ToL2Message and next eth block to search from.
*/
export async function retrieveNewPendingL1ToL2Messages(
publicClient: PublicClient,
inboxAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<L1ToL2Message>> {
const retrievedNewL1ToL2Messages: L1ToL2Message[] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const newL1ToL2MessageLogs = await getPendingL1ToL2MessageLogs(publicClient, inboxAddress, searchStartBlock);
const newL1ToL2MessageLogs = await getPendingL1ToL2MessageLogs(
publicClient,
inboxAddress,
searchStartBlock,
searchEndBlock,
);
const newL1ToL2Messages = processPendingL1ToL2MessageAddedLogs(newL1ToL2MessageLogs);
retrievedNewL1ToL2Messages.push(...newL1ToL2Messages);
// handles the case when there are no new messages:
searchStartBlock = (newL1ToL2MessageLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewL1ToL2Messages };
}

Expand All @@ -138,32 +149,33 @@ export async function retrieveNewPendingL1ToL2Messages(
* @param publicClient - The viem public client to use for transaction retrieval.
* @param inboxAddress - The address of the inbox contract to fetch messages from.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param currentBlockNumber - Latest available block number in the ETH node.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns An array of message keys that were cancelled and next eth block to search from.
*/
export async function retrieveNewCancelledL1ToL2Messages(
publicClient: PublicClient,
inboxAddress: EthAddress,
blockUntilSynced: boolean,
currentBlockNumber: bigint,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<Fr>> {
const retrievedNewCancelledL1ToL2Messages: Fr[] = [];
do {
if (searchStartBlock > currentBlockNumber) {
if (searchStartBlock > searchEndBlock) {
break;
}
const newL1ToL2MessageCancelledLogs = await getL1ToL2MessageCancelledLogs(
publicClient,
inboxAddress,
searchStartBlock,
searchEndBlock,
);
const newCancelledL1ToL2Messages = processCancelledL1ToL2MessagesLogs(newL1ToL2MessageCancelledLogs);
retrievedNewCancelledL1ToL2Messages.push(...newCancelledL1ToL2Messages);
// handles the case when there are no new messages:
searchStartBlock =
(newL1ToL2MessageCancelledLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n;
} while (blockUntilSynced && searchStartBlock <= currentBlockNumber);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { nextEthBlockNumber: searchStartBlock, retrievedData: retrievedNewCancelledL1ToL2Messages };
}
Loading

0 comments on commit defea83

Please sign in to comment.