Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pre-electra support from attestation pool #6998

Merged
merged 14 commits into from
Aug 16, 2024
Merged
20 changes: 17 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {Epoch, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {Attestation, Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE, isForkPostElectra} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand All @@ -16,6 +16,7 @@ import {
SyncCommitteeError,
} from "../../../../chain/errors/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";
import {ApiError} from "../../errors.js";

export function getBeaconPoolApi({
chain,
Expand All @@ -26,7 +27,15 @@ export function getBeaconPoolApi({
return {
async getPoolAttestations({slot, committeeIndex}) {
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
let attestations: Attestation[] = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? chain.clock.currentSlot);

if (isForkPostElectra(fork)) {
throw new ApiError(
400,
`Use getPoolAttestationsV2 to retrieve pool attestations for post-electra fork=${fork}`
);
}

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand All @@ -39,6 +48,11 @@ export function getBeaconPoolApi({
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? attestations[0]?.data.slot ?? chain.clock.currentSlot);
const isPostElectra = isForkPostElectra(fork);

attestations = attestations.filter((attestation) =>
isPostElectra ? isElectraAttestation(attestation) : !isElectraAttestation(attestation)
);

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand Down
29 changes: 26 additions & 3 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
ForkPreBlobs,
ForkBlobs,
ForkExecution,
isForkPostElectra,
} from "@lodestar/params";
import {MAX_BUILDER_BOOST_FACTOR} from "@lodestar/validator";
import {
Expand Down Expand Up @@ -1067,9 +1068,31 @@ export function getValidatorApi(
};
},

// TODO Electra: Implement getAggregatedAttestation to properly handle pre-electra
async getAggregatedAttestation() {
throw new Error("Not implemented. Use getAggregatedAttestationV2 for now.");
async getAggregatedAttestation({attestationDataRoot, slot}) {
ensi321 marked this conversation as resolved.
Show resolved Hide resolved
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHex(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, null, dataRootHex);
const fork = chain.config.getForkName(slot);

if (isForkPostElectra(fork)) {
throw new ApiError(
400,
`Use getAggregatedAttestationV2 to retrieve aggregated attestations for post-electra fork=${fork}`
);
}

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
};
},

async getAggregatedAttestationV2({attestationDataRoot, slot, committeeIndex}) {
Expand Down
10 changes: 8 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class BeaconChain implements IBeaconChain {

// Ops pool
readonly attestationPool: AttestationPool;
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
readonly aggregatedAttestationPool: AggregatedAttestationPool;
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();
Expand Down Expand Up @@ -226,7 +226,13 @@ export class BeaconChain implements IBeaconChain {
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
this.attestationPool = new AttestationPool(
config,
clock,
preAggregateCutOffTime,
this.opts?.preaggregateSlotDistance
);
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config);
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
clock,
preAggregateCutOffTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {BitArray, toHexString} from "@chainsafe/ssz";
import {
ForkName,
ForkSeq,
isForkPostElectra,
MAX_ATTESTATIONS,
MAX_ATTESTATIONS_ELECTRA,
MAX_COMMITTEES_PER_SLOT,
Expand Down Expand Up @@ -30,6 +31,7 @@ import {
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef, assert} from "@lodestar/utils";
import {ChainForkConfig} from "@lodestar/config";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -101,6 +103,8 @@ export class AggregatedAttestationPool {
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
private lowestPermissibleSlot = 0;

constructor(private readonly config: ChainForkConfig) {}

/** For metrics to track size of the pool */
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
let attestationCount = 0;
Expand Down Expand Up @@ -136,10 +140,20 @@ export class AggregatedAttestationPool {
attestationGroupByIndex = new Map<CommitteeIndex, MatchingDataAttestationGroup>();
attestationGroupByIndexByDataHash.set(dataRootHex, attestationGroupByIndex);
}
const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;

let committeeIndex;

if (isForkPostElectra(this.config.getForkName(slot))) {
if (!isElectraAttestation(attestation)) {
throw Error(`Attestation should be type electra.Attestation for slot ${slot}`);
}
committeeIndex = attestation.committeeBits.getSingleTrueBit();
} else {
if (isElectraAttestation(attestation)) {
throw Error(`Attestation should be type phase0.Attestation for slot ${slot}`);
}
committeeIndex = attestation.data.index;
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
Expand Down Expand Up @@ -390,6 +404,10 @@ export class AggregatedAttestationPool {

/**
* Get all attestations optionally filtered by `attestation.data.slot`
* Note this function is not fork aware and can potentially return a mix
* of phase0.Attestations and electra.Attestations.
* Caller of this function is expected to filtered result if they desire
* a homogenous array.
* @param bySlot slot to filter, `bySlot === attestation.data.slot`
*/
getAll(bySlot?: Slot): Attestation[] {
Expand Down Expand Up @@ -504,8 +522,15 @@ export class MatchingDataAttestationGroup {
*/
getAttestationsForBlock(fork: ForkName, notSeenAttestingIndices: Set<number>): AttestationNonParticipant[] {
const attestations: AttestationNonParticipant[] = [];
const forkSeq = ForkSeq[fork];
const isPostElectra = isForkPostElectra(fork);
for (const {attestation} of this.attestations) {
if (
(isPostElectra && !isElectraAttestation(attestation)) ||
(!isPostElectra && isElectraAttestation(attestation))
) {
continue;
}

let notSeenAttesterCount = 0;
const {aggregationBits} = attestation;
for (const notSeenIndex of notSeenAttestingIndices) {
Expand All @@ -514,13 +539,12 @@ export class MatchingDataAttestationGroup {
}
}

// if fork >= electra, should return electra-only attestations
if (notSeenAttesterCount > 0 && (forkSeq < ForkSeq.electra || isElectraAttestation(attestation))) {
if (notSeenAttesterCount > 0) {
attestations.push({attestation, notSeenAttesterCount});
}
}

const maxAttestation = forkSeq >= ForkSeq.electra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
const maxAttestation = isPostElectra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
if (attestations.length <= maxAttestation) {
return attestations;
} else {
Expand Down
34 changes: 27 additions & 7 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {BitArray} from "@chainsafe/ssz";
import {Signature, aggregateSignatures} from "@chainsafe/blst";
import {Slot, RootHex, isElectraAttestation, Attestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {isForkPostElectra} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {isElectraAggregate, pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

/**
* The number of slots that will be stored in the pool.
Expand All @@ -28,14 +30,15 @@ type AggregateFastPhase0 = {
signature: Signature;
};

type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};
export type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};

type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
export type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;

/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;
/** CommitteeIndex must be null for pre-electra. Must not be null post-electra */
type CommitteeIndex = number | null;
ensi321 marked this conversation as resolved.
Show resolved Hide resolved

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
Expand Down Expand Up @@ -68,6 +71,7 @@ export class AttestationPool {
private lowestPermissibleSlot = 0;

constructor(
private readonly config: ChainForkConfig,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
Expand Down Expand Up @@ -103,6 +107,7 @@ export class AttestationPool {
*/
add(committeeIndex: CommitteeIndex, attestation: Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const fork = this.config.getForkName(slot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject any attestations that are too old.
Expand All @@ -121,8 +126,14 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");
if (isForkPostElectra(fork)) {
// Electra only: this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool post-electra");
assert.true(isElectraAttestation(attestation), "Attestation should be type electra.Attestation");
ensi321 marked this conversation as resolved.
Show resolved Hide resolved
} else {
assert.true(!isElectraAttestation(attestation), "Attestation should be type phase0.Attestation");
committeeIndex = null; // For pre-electra, committee index info is encoded in attDataRootIndex
}

// Pre-aggregate the contribution with existing items
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
Expand All @@ -144,14 +155,23 @@ export class AttestationPool {
/**
* For validator API to get an aggregate
*/
// TODO Electra: Change attestation pool to accomodate pre-electra request
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): Attestation | null {
ensi321 marked this conversation as resolved.
Show resolved Hide resolved
const fork = this.config.getForkName(slot);
const isPostElectra = isForkPostElectra(fork);
committeeIndex = isPostElectra ? committeeIndex : null;

const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
}

if (isPostElectra) {
assert.true(isElectraAggregate(aggregate), "Aggregate should be type AggregateFastElectra");
} else {
assert.true(!isElectraAggregate(aggregate), "Aggregate should be type AggregateFastPhase0");
}

return fastToAttestation(aggregate);
}

Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Signature} from "@chainsafe/blst";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";
import {AggregateFast, AggregateFastElectra} from "./attestationPool.js";

/**
* Prune a Map indexed by slot to keep the most recent slots, up to `slotsRetained`
Expand Down Expand Up @@ -58,3 +59,7 @@ export function isValidBlsToExecutionChangeForBlockInclusion(

return true;
}

export function isElectraAggregate(aggregate: AggregateFast): aggregate is AggregateFastElectra {
return (aggregate as AggregateFastElectra).committeeBits !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ import {BitArray} from "@chainsafe/ssz";
import {CommitteeIndex, phase0, RootHex, Slot} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {SeenAttDataKey} from "../../util/sszBytes.js";
import {InsertOutcome} from "../opPools/types.js";

export type SeenAttDataKey = AttDataBase64 | AttDataCommitteeBitsBase64;
// pre-electra, AttestationData is used to cache attestations
type AttDataBase64 = string;
// electra, AttestationData + CommitteeBits are used to cache attestations
type AttDataCommitteeBitsBase64 = string;

export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeValidatorIndices: Uint32Array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
import {RegenCaller} from "../regen/index.js";
import {getSeenAttDataKeyFromSignedAggregateAndProof} from "../../util/sszBytes.js";
import {getSelectionProofSignatureSet, getAggregateAndProofSignatureSet} from "./signatureSets/index.js";
import {
getAttestationDataSigningRoot,
getCommitteeIndices,
getSeenAttDataKeyFromSignedAggregateAndProof,
getShufflingForAttestationVerification,
verifyHeadBlockAndTargetRoot,
verifyPropagationSlotRange,
Expand Down Expand Up @@ -71,9 +71,7 @@ async function validateAggregateAndProof(
const attData = aggregate.data;
const attSlot = attData.slot;

const seenAttDataKey = serializedData
? getSeenAttDataKeyFromSignedAggregateAndProof(ForkSeq[fork], serializedData)
: null;
const seenAttDataKey = serializedData ? getSeenAttDataKeyFromSignedAggregateAndProof(fork, serializedData) : null;
const cachedAttData = seenAttDataKey ? chain.seenAttestationDatas.get(attSlot, seenAttDataKey) : null;

let attIndex;
Expand Down
Loading
Loading