Skip to content

Commit

Permalink
Free the blobs
Browse files Browse the repository at this point in the history
fix the types

rejig the new constants in params

add comment for cleanup

update reqresp

fix api package

rename blobs repo

commit the wip modifications

further appropriate renaming

further references update

further reference updates

continue refac

fix reqresp build

further refac

further refac

fix api

fix db interface

fix beacondb alloc

build

fix api

improve blob verificaion

correct validation call

fixes

fix the produce block/blobs flow

reduce diff

blob gossip validation

update validations

cleanup block vali

reduce diff

handle gossip of block and blob

fix test for timebeing

modify publishing flow

fix import flow

onsidecarbyrange fix and some type fixes

fix sidecars by root

prune blockinput cache

fix kzg interface

small renaming

interface rename

fix fetch blockmaybeblobs by range test

fix build lint issues for now

c-kzg version fix

FullOrBlindedBlobSidecar changes

fix tests

complete the blob publishing flow

fix test

get the single node run functional

get the gossip blob flow working

fix peer syncing using req/resp

fix sidecar by root check

refactor blobsidecars hotdb and remove archive

add blob gossip validation flow

fix topic

fix the validation condition

add blob validation and test various sync modes

fix tests

rebase fixes

enable deneb spec tests

make blobsbyroot multi block

fixes

cleanup defunt builder endpoint

archive blobs post finalization uptill the blob window

serve finalized blobs within the blob prune window

fix test

fix test

lookup in archive as well

cleanup and improvements

rebase fx

Add 4844 sim test and override the field elements per blob

update image

add blob test

add test run in package

start unknown sync and range sync

finalize the sims

change the signing flow

fix test types

fix tests

fix test

lint

update tx type and corresponding ethereumjs image

update c-kzg and use blobs bundle proof

fix test

fix test

merge getblobsbundle into getpayloadv3

update images

fix genesis config

rebase fixes

fix test

update images

fix tests

lint

fix the sidecar request count limit

fix test

lint
  • Loading branch information
g11tech committed May 18, 2023
1 parent bd5ed70 commit 4d36e75
Show file tree
Hide file tree
Showing 76 changed files with 1,624 additions and 850 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/test-sim-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ env:
NETHERMIND_IMAGE: nethermind/nethermind:1.14.3
MERGEMOCK_IMAGE: g11tech/mergemock:latest
GETH_WITHDRAWALS_IMAGE: g11tech/geth:withdrawalsfeb8
ETHEREUMJS_WITHDRAWALS_IMAGE: g11tech/ethereumjs:feb8
ETHEREUMJS_WITHDRAWALS_IMAGE: g11tech/ethereumjs:blobs-b6b63
NETHERMIND_WITHDRAWALS_IMAGE: nethermindeth/nethermind:withdrawals_yolo
ETHEREUMJS_BLOBS_IMAGE: g11tech/ethereumjs:blobs-b6b63

jobs:
sim-merge-tests:
Expand Down Expand Up @@ -122,6 +123,17 @@ jobs:
# EL_BINARY_DIR: ${{ env.NETHERMIND_WITHDRAWALS_IMAGE }}
# EL_SCRIPT_DIR: netherminddocker

- name: Pull ethereumjs blobs
run: docker pull $ETHEREUMJS_BLOBS_IMAGE

- name: Test Lodestar <> ethereumjs blobs
run: yarn test:sim:blobs
working-directory: packages/beacon-node
env:
EL_BINARY_DIR: ${{ env.ETHEREUMJS_BLOBS_IMAGE }}
EL_SCRIPT_DIR: ethereumjsdocker
DEV_RUN: true

- name: Upload debug log test files
if: ${{ always() }}
uses: actions/upload-artifact@v2
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"test:sim:merge-interop": "mocha 'test/sim/merge-interop.test.ts'",
"test:sim:mergemock": "mocha 'test/sim/mergemock.test.ts'",
"test:sim:withdrawals": "mocha 'test/sim/withdrawal-interop.test.ts'",
"test:sim:blobs": "mocha 'test/sim/4844-interop.test.ts'",
"download-spec-tests": "node --loader=ts-node/esm test/spec/downloadTests.ts",
"check-spec-tests": "mocha test/spec/checkCoverage.ts",
"test:spec-bls-general": "mocha --config .mocharc.spec.cjs 'test/spec/bls/**/*.test.ts' 'test/spec/general/**/*.test.ts'",
Expand Down Expand Up @@ -134,7 +135,7 @@
"@multiformats/multiaddr": "^11.0.0",
"@types/datastore-level": "^3.0.0",
"buffer-xor": "^2.0.2",
"c-kzg": "^1.0.9",
"c-kzg": "^2.0.4",
"cross-fetch": "^3.1.4",
"datastore-core": "^8.0.1",
"datastore-level": "^9.0.1",
Expand Down
40 changes: 21 additions & 19 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {BlockError, BlockErrorCode} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
import {ApiModules} from "../../types.js";
import {ckzg} from "../../../../util/kzg.js";
import {resolveBlockId, toBeaconHeaderResponse} from "./utils.js";

/**
Expand Down Expand Up @@ -205,23 +204,12 @@ export function getBeaconBlockApi({
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
// Build a blockInput for post deneb, signedBlobs will be be used in followup PRs
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents as allForks.SignedBlockContents);
const beaconBlockSlot = signedBlock.message.slot;
const beaconBlockRoot = config.getForkTypes(beaconBlockSlot).BeaconBlock.hashTreeRoot(signedBlock.message);
const blobs = signedBlobs.map((sblob) => sblob.message.blob);

blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
{
beaconBlockRoot,
beaconBlockSlot,
blobs,
kzgAggregatedProof: ckzg.computeAggregateKzgProof(blobs),
}
signedBlobs.map((sblob) => sblob.message)
);
} else {
signedBlock = signedBlockOrContents as allForks.SignedBeaconBlock;
Expand All @@ -231,7 +219,8 @@ export function getBeaconBlockApi({

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot = computeTimeAtSlot(config, signedBlock.message.slot, chain.genesisTime) * 1000 - Date.now();
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
Expand All @@ -242,7 +231,7 @@ export function getBeaconBlockApi({
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlockMaybeBlobs(blockForImport) as Promise<unknown>,
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
Expand All @@ -254,14 +243,27 @@ export function getBeaconBlockApi({
}
throw e;
}),
// TODO deneb: publish signed blobs as well
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
},

async getBlobSidecars(_blockId) {
// TODO DENEB: Add implementation on the DB structure change PR
throw Error("");
async getBlobSidecars(blockId) {
const {block, executionOptimistic} = await resolveBlockId(chain.forkChoice, db, blockId);
const blockRoot = config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);

let {blobSidecars} = (await db.blobSidecars.get(blockRoot)) ?? {};
if (!blobSidecars) {
({blobSidecars} = (await db.blobSidecarsArchive.get(block.message.slot)) ?? {});
}

if (!blobSidecars) {
throw Error("Not found in db");
}
return {
executionOptimistic,
data: blobSidecars,
};
},
};
}
22 changes: 11 additions & 11 deletions packages/beacon-node/src/chain/archiver/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export async function archiveBlocks(

if (finalizedPostDeneb) {
await migrateBlobsSidecarFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobsSidecar from hot DB to cold DB");
logger.verbose("Migrated blobSidecars from hot DB to cold DB");
}
}

Expand All @@ -79,7 +79,7 @@ export async function archiveBlocks(
});

if (finalizedPostDeneb) {
await db.blobsSidecar.batchDelete(nonCanonicalBlockRoots);
await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobsSider from hot DB");
}
}
Expand All @@ -89,14 +89,14 @@ export async function archiveBlocks(
if (finalizedPostDeneb) {
const blobsSidecarMinEpoch = currentEpoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
if (blobsSidecarMinEpoch >= config.DENEB_FORK_EPOCH) {
const slotsToDelete = await db.blobsSidecarArchive.keys({lt: computeStartSlotAtEpoch(blobsSidecarMinEpoch)});
const slotsToDelete = await db.blobSidecarsArchive.keys({lt: computeStartSlotAtEpoch(blobsSidecarMinEpoch)});
if (slotsToDelete.length > 0) {
await db.blobsSidecarArchive.batchDelete(slotsToDelete);
await db.blobSidecarsArchive.batchDelete(slotsToDelete);
logger.verbose(
`blobsSidecar prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete[slotsToDelete.length - 1]}`
`blobSidecars prune: batchDelete range ${slotsToDelete[0]}..${slotsToDelete[slotsToDelete.length - 1]}`
);
} else {
logger.verbose(`blobsSidecar prune: no entries before epoch ${blobsSidecarMinEpoch}`);
logger.verbose(`blobSidecars prune: no entries before epoch ${blobsSidecarMinEpoch}`);
}
}
}
Expand Down Expand Up @@ -176,22 +176,22 @@ async function migrateBlobsSidecarFromHotToColdDb(
if (canonicalBlocks.length === 0) return;

// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobsSidecarEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
const canonicalBlobSidecarsEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.deneb)
.map(async (block) => {
const bytes = await db.blobsSidecar.getBinary(block.root);
const bytes = await db.blobSidecars.getBinary(block.root);
if (!bytes) {
throw Error(`No blobsSidecar found for slot ${block.slot} root ${toHex(block.root)}`);
throw Error(`No blobSidecars found for slot ${block.slot} root ${toHex(block.root)}`);
}
return {key: block.slot, value: bytes};
})
);

// put to blockArchive db and delete block db
await Promise.all([
db.blobsSidecarArchive.batchPutBinary(canonicalBlobsSidecarEntries),
db.blobsSidecar.batchDelete(canonicalBlocks.map((block) => block.root)),
db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
}
}
Expand Down
92 changes: 87 additions & 5 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, WithOptionalBytes} from "@lodestar/types";
import {allForks, deneb, Slot, WithOptionalBytes, RootHex} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {toHexString} from "@chainsafe/ssz";
import {pruneSetToMax} from "@lodestar/utils";

export enum BlockInputType {
preDeneb = "preDeneb",
Expand All @@ -19,7 +21,7 @@ export enum BlockSource {

export type BlockInput =
| {type: BlockInputType.preDeneb; block: allForks.SignedBeaconBlock; source: BlockSource}
| {type: BlockInputType.postDeneb; block: allForks.SignedBeaconBlock; source: BlockSource; blobs: deneb.BlobsSidecar};
| {type: BlockInputType.postDeneb; block: allForks.SignedBeaconBlock; source: BlockSource; blobs: deneb.BlobSidecars};

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
return (
Expand All @@ -29,7 +31,87 @@ export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clo
);
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}
type GossipedBlockInput =
| {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock}
| {type: GossipedInputType.blob; signedBlob: deneb.SignedBlobSidecar};
type BlockInputCacheType = {block?: allForks.SignedBeaconBlock; blobs: Map<number, deneb.BlobSidecar>};

const MAX_GOSSIPINPUT_CACHE = 5;

export const getBlockInput = {
blockInputCache: new Map<RootHex, BlockInputCacheType>(),

getFullBlockInput(
config: ChainForkConfig,
gossipedInput: GossipedBlockInput
):
| {blockInput: BlockInput; blockInputMeta: {pending: null; haveBlobs: number; expectedBlobs: number}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.blob; haveBlobs: number; expectedBlobs: number}} {
let blockHex;
let blockCache;
if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock} = gossipedInput;
blockHex = toHexString(
config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);
blockCache = this.blockInputCache.get(blockHex) ?? {blobs: new Map<number, deneb.BlobSidecar>()};
blockCache.block = signedBlock;
} else {
const {signedBlob} = gossipedInput;
blockHex = toHexString(signedBlob.message.blockRoot);
blockCache = this.blockInputCache.get(blockHex);

// If a new entry is going to be inserted, prune out old ones
if (blockCache === undefined) {
pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE);
blockCache = {blobs: new Map<number, deneb.BlobSidecar>()};
}
// TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions
blockCache.blobs.set(signedBlob.message.index, signedBlob.message);
}
this.blockInputCache.set(blockHex, blockCache);
const {block: signedBlock} = blockCache;
if (signedBlock !== undefined) {
const {blobKzgCommitments} = (signedBlock as deneb.SignedBeaconBlock).message.body;
if (blobKzgCommitments.length < blockCache.blobs.size) {
throw Error(`Received more blobs=${blockCache.blobs.size} than commitments=${blobKzgCommitments.length}`);
}
if (blobKzgCommitments.length === blockCache.blobs.size) {
const blobSidecars = [];
for (let index = 0; index < blobKzgCommitments.length; index++) {
const blobSidecar = blockCache.blobs.get(index);
if (blobSidecar === undefined) {
throw Error("Missing blobSidecar");
}
blobSidecars.push(blobSidecar);
}
return {
blockInput: getBlockInput.postDeneb(config, signedBlock, BlockSource.gossip, blobSidecars),
blockInputMeta: {pending: null, haveBlobs: blockCache.blobs.size, expectedBlobs: blobKzgCommitments.length},
};
} else {
return {
blockInput: null,
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blockCache.blobs.size,
expectedBlobs: blobKzgCommitments.length,
},
};
}
} else {
return {
blockInput: null,
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blockCache.blobs.size, expectedBlobs: null},
};
}
},

preDeneb(config: ChainForkConfig, block: allForks.SignedBeaconBlock, source: BlockSource): BlockInput {
if (config.getForkSeq(block.message.slot) >= ForkSeq.deneb) {
throw Error(`Post Deneb block slot ${block.message.slot}`);
Expand All @@ -45,7 +127,7 @@ export const getBlockInput = {
config: ChainForkConfig,
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobs: deneb.BlobsSidecar
blobs: deneb.BlobSidecars
): BlockInput {
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
throw Error(`Pre Deneb block slot ${block.message.slot}`);
Expand Down Expand Up @@ -97,8 +179,8 @@ export type ImportBlockOpts = {
* Metadata: `true` if all the signatures including the proposer signature have been verified
*/
validSignatures?: boolean;
/** Set to true if already run `validateBlobsSidecar()` sucessfully on the blobs */
validBlobsSidecar?: boolean;
/** Set to true if already run `validateBlobSidecars()` sucessfully on the blobs */
validBlobSidecars?: boolean;
/** Seen timestamp seconds */
seenTimestampSec?: number;
/** Set to true if persist block right at verification time */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {Slot, deneb, WithOptionalBytes} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobsSidecar} from "../validation/blobsSidecar.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {BlockInput, BlockInputType, ImportBlockOpts} from "./types.js";

/**
Expand Down Expand Up @@ -126,16 +126,19 @@ function maybeValidateBlobs(
// TODO Deneb: Make switch verify it's exhaustive
switch (blockInput.type) {
case BlockInputType.postDeneb: {
if (opts.validBlobsSidecar) {
if (opts.validBlobSidecars) {
return DataAvailableStatus.available;
}

const {block, blobs} = blockInput;
const blockSlot = block.message.slot;
const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
const {
blobKzgCommitments,
executionPayload: {transactions},
} = (block as deneb.SignedBeaconBlock).message.body;
const beaconBlockRoot = config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
// TODO Deneb: This function throws un-typed errors
validateBlobsSidecar(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs);
validateBlobSidecars(blockSlot, beaconBlockRoot, transactions, blobKzgCommitments, blobs);

return DataAvailableStatus.available;
}
Expand Down
Loading

0 comments on commit 4d36e75

Please sign in to comment.