Skip to content

Commit

Permalink
fix: batch validation for electra attestations (#6788)
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored and g11tech committed Aug 23, 2024
1 parent a6ba9e8 commit 23119da
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 109 deletions.
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export function getBeaconPoolApi({
// when a validator is configured with multiple beacon node urls, this attestation data may come from another beacon node
// and the block hasn't been in our forkchoice since we haven't seen / processing that block
// see https://github.com/ChainSafe/lodestar/issues/5098
const {indexedAttestation, subnet, attDataRootHex} = await validateGossipFnRetryUnknownRoot(
const {indexedAttestation, subnet, attDataRootHex, committeeIndex} = await validateGossipFnRetryUnknownRoot(
validateFn,
network,
chain,
Expand All @@ -75,7 +75,7 @@ export function getBeaconPoolApi({
);

if (network.shouldAggregate(subnet, slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

Expand Down
29 changes: 14 additions & 15 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ type CommitteeIndex = number;
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>>(
() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>()
);
private readonly aggregateByIndexByRootBySlot = new MapDef<
Slot,
Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>
>(() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>());
private lowestPermissibleSlot = 0;

constructor(
Expand All @@ -75,8 +76,10 @@ export class AttestationPool {
/** Returns current count of pre-aggregated attestations with unique data */
getAttestationCount(): number {
let attestationCount = 0;
for (const attestationByRoot of this.attestationByRootBySlot.values()) {
attestationCount += attestationByRoot.size;
for (const attestationByIndexByRoot of this.aggregateByIndexByRootBySlot.values()) {
for (const attestationByIndex of attestationByIndexByRoot.values()) {
attestationCount += attestationByIndex.size;
}
}
return attestationCount;
}
Expand All @@ -98,7 +101,7 @@ export class AttestationPool {
* - Valid committeeIndex
* - Valid data
*/
add(attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome {
add(committeeIndex: CommitteeIndex, attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const lowestPermissibleSlot = this.lowestPermissibleSlot;

Expand All @@ -113,15 +116,11 @@ export class AttestationPool {
}

// Limit object per slot
const aggregateByRoot = this.attestationByRootBySlot.getOrDefault(slot);
const aggregateByRoot = this.aggregateByIndexByRootBySlot.getOrDefault(slot);
if (aggregateByRoot.size >= MAX_ATTESTATIONS_PER_SLOT) {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");

Expand All @@ -146,7 +145,7 @@ export class AttestationPool {
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
Expand All @@ -160,7 +159,7 @@ export class AttestationPool {
* By default, not interested in attestations in old slots, we only preaggregate attestations for the current slot.
*/
prune(clockSlot: Slot): void {
pruneBySlot(this.attestationByRootBySlot, clockSlot, SLOTS_RETAINED);
pruneBySlot(this.aggregateByIndexByRootBySlot, clockSlot, SLOTS_RETAINED);
// by default preaggregateSlotDistance is 0, i.e only accept attestations in the same clock slot.
this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0);
}
Expand All @@ -174,8 +173,8 @@ export class AttestationPool {

const aggregateByRoots =
bySlot === undefined
? Array.from(this.attestationByRootBySlot.values())
: [this.attestationByRootBySlot.get(bySlot)];
? Array.from(this.aggregateByIndexByRootBySlot.values())
: [this.aggregateByIndexByRootBySlot.get(bySlot)];

for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
Expand Down
24 changes: 15 additions & 9 deletions packages/beacon-node/src/chain/seenCache/seenAttestationData.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {phase0, RootHex, Slot} from "@lodestar/types";
import {BitArray} from "@chainsafe/ssz";
import {CommitteeIndex, phase0, RootHex, Slot} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {AttDataBase64} from "../../util/sszBytes.js";
import {SeenAttDataKey} from "../../util/sszBytes.js";
import {InsertOutcome} from "../opPools/types.js";

export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeIndices: Uint32Array;
committeeValidatorIndices: Uint32Array;
// undefined for phase0 Attestation
committeeBits?: BitArray;
committeeIndex: CommitteeIndex;
// IndexedAttestationData signing root, 32 bytes
signingRoot: Uint8Array;
// to be consumed by forkchoice and oppool
Expand Down Expand Up @@ -38,12 +42,14 @@ const DEFAULT_MAX_CACHE_SIZE_PER_SLOT = 200;
const DEFAULT_CACHE_SLOT_DISTANCE = 2;

/**
* Cached seen AttestationData to improve gossip validation. For Electra, this still take into account attestationIndex
* even through it is moved outside of AttestationData.
* As of April 2023, validating gossip attestation takes ~12% of cpu time for a node subscribing to all subnets on mainnet.
* Having this cache help saves a lot of cpu time since most of the gossip attestations are on the same slot.
*/
export class SeenAttestationDatas {
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<AttDataBase64, AttestationDataCacheEntry>>(
() => new Map<AttDataBase64, AttestationDataCacheEntry>()
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<SeenAttDataKey, AttestationDataCacheEntry>>(
() => new Map<SeenAttDataKey, AttestationDataCacheEntry>()
);
private lowestPermissibleSlot = 0;

Expand All @@ -57,14 +63,14 @@ export class SeenAttestationDatas {
}

// TODO: Move InsertOutcome type definition to a common place
add(slot: Slot, attDataBase64: AttDataBase64, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
add(slot: Slot, attDataKey: SeenAttDataKey, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.getOrDefault(slot);
if (cacheEntryByAttDataBase64.has(attDataBase64)) {
if (cacheEntryByAttDataBase64.has(attDataKey)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}
Expand All @@ -74,11 +80,11 @@ export class SeenAttestationDatas {
return InsertOutcome.ReachLimit;
}

cacheEntryByAttDataBase64.set(attDataBase64, cacheEntry);
cacheEntryByAttDataBase64.set(attDataKey, cacheEntry);
return InsertOutcome.NewData;
}

get(slot: Slot, attDataBase64: AttDataBase64): AttestationDataCacheEntry | null {
get(slot: Slot, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.get(slot);
const cacheEntry = cacheEntryByAttDataBase64?.get(attDataBase64);
if (cacheEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async function validateAggregateAndProof(
// [REJECT] The committee index is within the expected range
// -- i.e. data.index < get_committee_count_per_slot(state, data.target.epoch)
const committeeIndices = cachedAttData
? cachedAttData.committeeIndices
? cachedAttData.committeeValidatorIndices
: getCommitteeIndices(shuffling, attSlot, attIndex);

// [REJECT] The number of aggregation bits matches the committee size
Expand Down
Loading

0 comments on commit 23119da

Please sign in to comment.