Skip to content

Commit

Permalink
fix: Track L1 block for last L2 block body retrieved (#7927)
Browse files Browse the repository at this point in the history
The archiver was keeping a single L1 pointer for L2 blocks, shared by
block metadata and block bodies, using the min of the two. However, if a
block metadata had no corresponding block body, then the L1 pointer
would not be advanced, leading to the same block metadata to being
downloaded again, which triggered an error of expected L2 block
mismatch.

Having no corresponding block body happens when the tx effects hash is
not submitted by the sequencer, which happens when it is detected to
have been uploaded already, which happens when the tx effects hash is
repeated across two blocks, which happens when two blocks are empty (ie
have only padding txs).

By adding separate tracking pointers, we ensure that we don't
accidentally process the same block header twice from the archiver. This
PR also adds a try/catch to the archiver loop so it doesn't bring down
the entire node if it runs into an error.

Fixes #7918
  • Loading branch information
spalladino authored Aug 13, 2024
1 parent 014b5f0 commit cd36be4
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 41 deletions.
44 changes: 32 additions & 12 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,21 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.sync(false), this.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs);
this.runningPromise.start();
}

/**
* Syncs and catches exceptions.
*/
private async safeSync() {
try {
await this.sync(false);
} catch (error) {
this.log.error('Error syncing archiver', error);
}
}

/**
* Fetches logs from L1 contracts and processes them.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
Expand All @@ -166,10 +177,14 @@ export class Archiver implements ArchiveSource {
*
* This code does not handle reorgs.
*/
const { blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const { blockBodiesSynchedTo, blocksSynchedTo, messagesSynchedTo } = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

if (currentL1BlockNumber <= blocksSynchedTo && currentL1BlockNumber <= messagesSynchedTo) {
if (
currentL1BlockNumber <= blocksSynchedTo &&
currentL1BlockNumber <= messagesSynchedTo &&
currentL1BlockNumber <= blockBodiesSynchedTo
) {
// chain hasn't moved forward
// or it's been rolled back
this.log.debug(`Nothing to sync`, { currentL1BlockNumber, blocksSynchedTo, messagesSynchedTo });
Expand Down Expand Up @@ -220,23 +235,27 @@ export class Archiver implements ArchiveSource {
// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getSynchedL2BlockNumber()) + 1);

this.log.debug(`Retrieving block bodies from ${blockBodiesSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockBodies = await retrieveBlockBodiesFromAvailabilityOracle(
this.publicClient,
this.availabilityOracleAddress,
blockUntilSynced,
blocksSynchedTo + 1n,
blockBodiesSynchedTo + 1n,
currentL1BlockNumber,
);

const blockBodies = retrievedBlockBodies.retrievedData.map(([blockBody]) => blockBody);
await this.store.addBlockBodies(blockBodies);
this.log.debug(
`Retrieved ${retrievedBlockBodies.retrievedData.length} block bodies up to L1 block ${retrievedBlockBodies.lastProcessedL1BlockNumber}`,
);
await this.store.addBlockBodies(retrievedBlockBodies);

// Now that we have block bodies we will retrieve block metadata and build L2 blocks from the bodies and
// the metadata
let retrievedBlocks: DataRetrieval<L2Block>;
{
// @todo @LHerskind Investigate how necessary that nextExpectedL2BlockNum really is.
// Also, I would expect it to break horribly if we have a reorg.
this.log.debug(`Retrieving block metadata from ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`);
const retrievedBlockMetadata = await retrieveBlockMetadataFromRollup(
this.publicClient,
this.rollupAddress,
Expand Down Expand Up @@ -278,17 +297,18 @@ export class Archiver implements ArchiveSource {
} and ${currentL1BlockNumber}.`,
);

// Set the `lastProcessedL1BlockNumber` to the smallest of the header and body retrieval
const min = (a: bigint, b: bigint) => (a < b ? a : b);
retrievedBlocks = {
lastProcessedL1BlockNumber: min(
retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedBlockBodies.lastProcessedL1BlockNumber,
),
lastProcessedL1BlockNumber: retrievedBlockMetadata.lastProcessedL1BlockNumber,
retrievedData: blocks,
};
}

this.log.debug(
`Processing retrieved blocks ${retrievedBlocks.retrievedData
.map(b => b.number)
.join(',')} with last processed L1 block ${retrievedBlocks.lastProcessedL1BlockNumber}`,
);

await Promise.all(
retrievedBlocks.retrievedData.map(block => {
const noteEncryptedLogs = block.body.noteEncryptedLogs;
Expand Down
6 changes: 4 additions & 2 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import { type DataRetrieval } from './data_retrieval.js';
* Represents the latest L1 block processed by the archiver for various objects in L2.
*/
export type ArchiverL1SynchPoint = {
/** Number of the last L1 block that added a new L2 block. */
/** Number of the last L1 block that added a new L2 block metadata. */
blocksSynchedTo: bigint;
/** Number of the last L1 block that added a new L2 block body. */
blockBodiesSynchedTo: bigint;
/** Number of the last L1 block that added L1 -> L2 messages from the Inbox. */
messagesSynchedTo: bigint;
};
Expand All @@ -53,7 +55,7 @@ export interface ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean>;
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean>;

/**
* Gets block bodies that have the same txsEffectsHashes as we supply.
Expand Down
35 changes: 26 additions & 9 deletions yarn-project/archiver/src/archiver/archiver_store_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import { type Body, InboxLeaf, L2Block, LogId, LogType, TxHash } from '@aztec/circuit-types';
import '@aztec/circuit-types/jest';
import { AztecAddress, Fr, INITIAL_L2_BLOCK_NUM, L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js';
import {
Expand All @@ -14,7 +14,7 @@ import {
SerializableContractInstance,
} from '@aztec/types/contracts';

import { type ArchiverDataStore } from './archiver_store.js';
import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type DataRetrieval } from './data_retrieval.js';

/**
Expand All @@ -25,6 +25,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe(testName, () => {
let store: ArchiverDataStore;
let blocks: DataRetrieval<L2Block>;
let blockBodies: DataRetrieval<Body>;
const blockTests: [number, number, () => L2Block[]][] = [
[1, 1, () => blocks.retrievedData.slice(0, 1)],
[10, 1, () => blocks.retrievedData.slice(9, 10)],
Expand All @@ -39,11 +40,15 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
lastProcessedL1BlockNumber: 5n,
retrievedData: Array.from({ length: 10 }).map((_, i) => L2Block.random(i + 1)),
};
blockBodies = {
retrievedData: blocks.retrievedData.map(block => block.body),
lastProcessedL1BlockNumber: 4n,
};
});

describe('addBlocks', () => {
it('returns success when adding block bodies', async () => {
await expect(store.addBlockBodies(blocks.retrievedData.map(block => block.body))).resolves.toBe(true);
await expect(store.addBlockBodies(blockBodies)).resolves.toBe(true);
});

it('returns success when adding blocks', async () => {
Expand All @@ -59,7 +64,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
describe('getBlocks', () => {
beforeEach(async () => {
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each(blockTests)('retrieves previously stored blocks', async (start, limit, getExpectedBlocks) => {
Expand Down Expand Up @@ -95,15 +100,26 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block was published', async () => {
await store.addBlocks(blocks);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: blocks.lastProcessedL1BlockNumber,
messagesSynchedTo: 0n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number in which the most recent L2 block body was published', async () => {
await store.addBlockBodies(blockBodies);
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 0n,
blockBodiesSynchedTo: blockBodies.lastProcessedL1BlockNumber,
} satisfies ArchiverL1SynchPoint);
});

it('returns the L1 block number that most recently added messages from inbox', async () => {
Expand All @@ -114,7 +130,8 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
await expect(store.getSynchPoint()).resolves.toEqual({
blocksSynchedTo: 0n,
messagesSynchedTo: 1n,
});
blockBodiesSynchedTo: 0n,
} satisfies ArchiverL1SynchPoint);
});
});

Expand Down Expand Up @@ -179,7 +196,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
),
);
await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);
});

it.each([
Expand Down Expand Up @@ -335,7 +352,7 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
};

await store.addBlocks(blocks);
await store.addBlockBodies(blocks.retrievedData.map(block => block.body));
await store.addBlockBodies(blockBodies);

await Promise.all(
blocks.retrievedData.map(block =>
Expand Down
20 changes: 14 additions & 6 deletions yarn-project/archiver/src/archiver/data_retrieval.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type Body, type InboxLeaf } from '@aztec/circuit-types';
import { type AppendOnlyTreeSnapshot, Fr, type Header } from '@aztec/circuits.js';
import { type EthAddress } from '@aztec/foundation/eth-address';
import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { RollupAbi } from '@aztec/l1-artifacts';

import { type PublicClient, getAbiItem } from 'viem';
Expand Down Expand Up @@ -45,6 +46,7 @@ export async function retrieveBlockMetadataFromRollup(
searchStartBlock: bigint,
searchEndBlock: bigint,
expectedNextL2BlockNum: bigint,
logger: DebugLogger = createDebugLogger('aztec:archiver'),
): Promise<DataRetrieval<[Header, AppendOnlyTreeSnapshot]>> {
const retrievedBlockMetadata: [Header, AppendOnlyTreeSnapshot][] = [];
do {
Expand All @@ -61,13 +63,18 @@ export async function retrieveBlockMetadataFromRollup(
break;
}

const lastLog = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1];
logger.debug(
`Got L2 block processed logs for ${l2BlockProcessedLogs[0].blockNumber}-${lastLog.blockNumber} between ${searchStartBlock}-${searchEndBlock} L1 blocks`,
);

const newBlockMetadata = await processL2BlockProcessedLogs(
publicClient,
expectedNextL2BlockNum,
l2BlockProcessedLogs,
);
retrievedBlockMetadata.push(...newBlockMetadata);
searchStartBlock = l2BlockProcessedLogs[l2BlockProcessedLogs.length - 1].blockNumber! + 1n;
searchStartBlock = lastLog.blockNumber! + 1n;
expectedNextL2BlockNum += BigInt(newBlockMetadata.length);
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);
return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockMetadata };
Expand All @@ -80,16 +87,16 @@ export async function retrieveBlockMetadataFromRollup(
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @param searchStartBlock - The block number to use for starting the search.
* @param searchEndBlock - The highest block number that we should search up to.
* @returns A array of tuples of L2 block bodies and their associated hash as well as the next eth block to search from
* @returns A array of L2 block bodies as well as the next eth block to search from
*/
export async function retrieveBlockBodiesFromAvailabilityOracle(
publicClient: PublicClient,
availabilityOracleAddress: EthAddress,
blockUntilSynced: boolean,
searchStartBlock: bigint,
searchEndBlock: bigint,
): Promise<DataRetrieval<[Body, Buffer]>> {
const retrievedBlockBodies: [Body, Buffer][] = [];
): Promise<DataRetrieval<Body>> {
const retrievedBlockBodies: Body[] = [];

do {
if (searchStartBlock > searchEndBlock) {
Expand All @@ -106,9 +113,10 @@ export async function retrieveBlockBodiesFromAvailabilityOracle(
}

const newBlockBodies = await processTxsPublishedLogs(publicClient, l2TxsPublishedLogs);
retrievedBlockBodies.push(...newBlockBodies);
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber! + 1n;
retrievedBlockBodies.push(...newBlockBodies.map(([body]) => body));
searchStartBlock = l2TxsPublishedLogs[l2TxsPublishedLogs.length - 1].blockNumber + 1n;
} while (blockUntilSynced && searchStartBlock <= searchEndBlock);

return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedBlockBodies };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ describe('Block Body Store', () => {
it('Should add and return block bodies', async () => {
const body = Body.random(1);

await archiverStore.addBlockBodies([body]);
await archiverStore.addBlockBodies({ retrievedData: [body], lastProcessedL1BlockNumber: 5n });

const txsEffectsHash = body.getTxsEffectsHash();

const [returnedBody] = await archiverStore.getBlockBodies([txsEffectsHash]);

expect(body).toStrictEqual(returnedBody);

const { blockBodiesSynchedTo } = await archiverStore.getSynchPoint();
expect(blockBodiesSynchedTo).toEqual(5n);
});
});
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
import { Body } from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';

import { type DataRetrieval } from '../data_retrieval.js';

export class BlockBodyStore {
/** Map block body hash to block body */
#blockBodies: AztecMap<string, Buffer>;

/** Stores L1 block number in which the last processed L2 block body was included */
#lastSynchedL1Block: AztecSingleton<bigint>;

constructor(private db: AztecKVStore, private log = createDebugLogger('aztec:archiver:block_body_store')) {
this.#blockBodies = db.openMap('archiver_block_bodies');
this.#lastSynchedL1Block = db.openSingleton('archiver_block_bodies_last_synched_l1_block');
}

/**
* Append new block bodies to the store's map.
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.db.transaction(() => {
for (const body of blockBodies) {
for (const body of blockBodies.retrievedData) {
void this.#blockBodies.set(body.getTxsEffectsHash().toString('hex'), body.toBuffer());
}

void this.#lastSynchedL1Block.set(blockBodies.lastProcessedL1BlockNumber);
return true;
});
}
Expand Down Expand Up @@ -57,4 +63,12 @@ export class BlockBodyStore {

return blockBody && Body.fromBuffer(blockBody);
}

/**
* Gets the last L1 block number in which a L2 block body was included
* @returns The L1 block number
*/
getSynchedL1BlockNumber(): bigint {
return this.#lastSynchedL1Block.get() ?? 0n;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
* @param blockBodies - The L2 block bodies to be added to the store.
* @returns True if the operation is successful.
*/
addBlockBodies(blockBodies: Body[]): Promise<boolean> {
addBlockBodies(blockBodies: DataRetrieval<Body>): Promise<boolean> {
return this.#blockBodyStore.addBlockBodies(blockBodies);
}

Expand Down Expand Up @@ -260,6 +260,7 @@ export class KVArchiverDataStore implements ArchiverDataStore {
getSynchPoint(): Promise<ArchiverL1SynchPoint> {
return Promise.resolve({
blocksSynchedTo: this.#blockStore.getSynchedL1BlockNumber(),
blockBodiesSynchedTo: this.#blockBodyStore.getSynchedL1BlockNumber(),
messagesSynchedTo: this.#messageStore.getSynchedL1BlockNumber(),
});
}
Expand Down
Loading

0 comments on commit cd36be4

Please sign in to comment.