Skip to content

Commit

Permalink
fix: attestation pool for electra (#6744)
Browse files Browse the repository at this point in the history
* feat: attestationPool to group by slot by data root by committee index for electra

* fix: gossip validation and assert.notNull() util

* fix: remove light-client stats.html

* fix: lint and check-types
  • Loading branch information
twoeths authored May 8, 2024
1 parent 838add9 commit 59f3abe
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 129 deletions.
11 changes: 6 additions & 5 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ export type Api = {
*/
getAggregatedAttestation(
attestationDataRoot: Root,
slot: Slot
slot: Slot,
index: CommitteeIndex
): Promise<
ApiClientResponse<
{[HttpStatusCode.OK]: {data: allForks.Attestation; version: ForkName}},
Expand Down Expand Up @@ -498,7 +499,7 @@ export type ReqTypes = {
produceBlindedBlock: {params: {slot: number}; query: {randao_reveal: string; graffiti: string}};
produceAttestationData: {query: {slot: number; committee_index: number}};
produceSyncCommitteeContribution: {query: {slot: number; subcommittee_index: number; beacon_block_root: string}};
getAggregatedAttestation: {query: {attestation_data_root: string; slot: number}};
getAggregatedAttestation: {query: {attestation_data_root: string; slot: number; index: number}};
publishAggregateAndProofs: {body: unknown};
publishContributionAndProofs: {body: unknown};
prepareBeaconCommitteeSubnet: {body: unknown};
Expand Down Expand Up @@ -647,10 +648,10 @@ export function getReqSerializers(): ReqSerializers<Api, ReqTypes> {
},

getAggregatedAttestation: {
writeReq: (root, slot) => ({query: {attestation_data_root: toHexString(root), slot}}),
parseReq: ({query}) => [fromHexString(query.attestation_data_root), query.slot],
writeReq: (root, slot, index) => ({query: {attestation_data_root: toHexString(root), slot, index}}),
parseReq: ({query}) => [fromHexString(query.attestation_data_root), query.slot, query.index],
schema: {
query: {attestation_data_root: Schema.StringRequired, slot: Schema.UintRequired},
query: {attestation_data_root: Schema.StringRequired, slot: Schema.UintRequired, index: Schema.UintRequired},
},
},

Expand Down
98 changes: 52 additions & 46 deletions packages/api/test/unit/beacon/testData/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ export const eventTestData: EventData = {
block: "0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf",
executionOptimistic: false,
},
[EventType.attestation]: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
[EventType.attestation]: {
version: ForkName.altair,
data: ssz.phase0.Attestation.fromJson({
aggregation_bits: "0x01",
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
},
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit.fromJson({
message: {epoch: "1", validator_index: "1"},
signature:
Expand Down Expand Up @@ -72,44 +75,47 @@ export const eventTestData: EventData = {
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
[EventType.attesterSlashing]: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
[EventType.attesterSlashing]: {
version: ForkName.altair,
data: ssz.phase0.AttesterSlashing.fromJson({
attestation_1: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
attestation_2: {
attesting_indices: ["0", "1"],
data: {
slot: "0",
index: "0",
beacon_block_root: "0x0000000000000000000000000000000000000000000000000000000000000000",
source: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
target: {
epoch: "0",
root: "0x0000000000000000000000000000000000000000000000000000000000000000",
},
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
signature:
"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
},
}),
}),
},
[EventType.blsToExecutionChange]: ssz.capella.SignedBLSToExecutionChange.fromJson({
message: {
validator_index: "1",
Expand Down
7 changes: 5 additions & 2 deletions packages/api/test/unit/beacon/testData/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ export const testData: GenericServerTestCases<Api> = {
res: {data: ssz.altair.SyncCommitteeContribution.defaultValue()},
},
getAggregatedAttestation: {
args: [ZERO_HASH, 32000],
res: {data: ssz.phase0.Attestation.defaultValue()},
args: [ZERO_HASH, 32000, 2],
res: {
data: ssz.phase0.Attestation.defaultValue(),
version: ForkName.altair,
},
},
publishAggregateAndProofs: {
args: [[ssz.phase0.SignedAggregateAndProof.defaultValue()]],
Expand Down
9 changes: 6 additions & 3 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,16 +1063,19 @@ export function getValidatorApi({
};
},

async getAggregatedAttestation(attestationDataRoot, slot) {
async getAggregatedAttestation(attestationDataRoot, slot, committeeIndex) {
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHexString(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, dataRootHex);
const aggregate = chain.attestationPool.getAggregate(slot, committeeIndex, dataRootHex);

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
throw new ApiError(
404,
`No aggregated attestation for slot=${slot} committeeIndex=${committeeIndex}, dataRoot=${dataRootHex}`
);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
getBlockRootAtSlot,
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef} from "@lodestar/utils";
import {toHex, MapDef, assert} from "@lodestar/utils";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -141,10 +141,8 @@ export class AggregatedAttestationPool {
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
if (committeeIndex === null) {
// this should not happen because attestation should be validated before reaching this
throw Error(`Invalid attestation slot=${slot} committeeIndex=${committeeIndex}`);
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
if (!attestationGroup) {
attestationGroup = new MatchingDataAttestationGroup(committee, attestation.data);
Expand Down
79 changes: 30 additions & 49 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray} from "@chainsafe/ssz";
import {Slot, RootHex, allForks} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Slot, RootHex, allForks, isElectraAttestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
Expand Down Expand Up @@ -36,6 +36,8 @@ type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
* the native aggregation scheme.
Expand All @@ -60,8 +62,8 @@ type DataRootHex = string;
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, AggregateFast>>(
() => new Map<DataRootHex, AggregateFast>()
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>>(
() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>()
);
private lowestPermissibleSlot = 0;

Expand Down Expand Up @@ -117,23 +119,35 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");

// Pre-aggregate the contribution with existing items
const aggregate = aggregateByRoot.get(attDataRootHex);
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
if (aggregateByIndex === undefined) {
aggregateByIndex = new Map<CommitteeIndex, AggregateFast>();
aggregateByRoot.set(attDataRootHex, aggregateByIndex);
}
const aggregate = aggregateByIndex.get(committeeIndex);
if (aggregate) {
// Aggregate mutating
return aggregateAttestationInto(aggregate, attestation);
} else {
// Create new aggregate
aggregateByRoot.set(attDataRootHex, attestationToAggregate(attestation));
aggregateByIndex.set(committeeIndex, attestationToAggregate(attestation));
return InsertOutcome.NewData;
}
}

/**
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex);
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
Expand Down Expand Up @@ -166,8 +180,10 @@ export class AttestationPool {

for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
for (const aggFast of aggregateByRoot.values()) {
attestations.push(fastToAttestation(aggFast));
for (const aggFastByIndex of aggregateByRoot.values()) {
for (const aggFast of aggFastByIndex.values()) {
attestations.push(fastToAttestation(aggFast));
}
}
}
}
Expand All @@ -180,35 +196,13 @@ export class AttestationPool {
// - Insert attestations coming from gossip and API

/**
* Aggregate a new contribution into `aggregate` mutating it
* Aggregate a new attestation into `aggregate` mutating it
*/
function aggregateAttestationInto(aggregate: AggregateFast, attestation: allForks.Attestation): InsertOutcome {
const bitIndex = attestation.aggregationBits.getSingleTrueBit();

// Should never happen, attestations are verified against this exact condition before
if (bitIndex === null) {
throw Error("Invalid attestation not exactly one bit set");
}

if ("committeeBits" in attestation && !("committeeBits" in aggregate)) {
throw Error("Attempt to aggregate electra attestation into phase0 attestation");
}

if (!("committeeBits" in attestation) && "committeeBits" in aggregate) {
throw Error("Attempt to aggregate phase0 attestation into electra attestation");
}

if ("committeeBits" in attestation) {
// We assume attestation.committeeBits should already be validated in api and gossip handler and should be non-null
const attestationCommitteeIndex = attestation.committeeBits.getSingleTrueBit();
const aggregateCommitteeIndex = (aggregate as AggregateFastElectra).committeeBits.getSingleTrueBit();

if (attestationCommitteeIndex !== aggregateCommitteeIndex) {
throw Error(
`Committee index mismatched: attestation ${attestationCommitteeIndex} aggregate ${aggregateCommitteeIndex}`
);
}
}
assert.notNull(bitIndex, "Invalid attestation in pool, not exactly one bit set");

if (aggregate.aggregationBits.get(bitIndex) === true) {
return InsertOutcome.AlreadyKnown;
Expand All @@ -226,7 +220,7 @@ function aggregateAttestationInto(aggregate: AggregateFast, attestation: allFork
* Format `contribution` into an efficient `aggregate` to add more contributions in with aggregateContributionInto()
*/
function attestationToAggregate(attestation: allForks.Attestation): AggregateFast {
if ("committeeBits" in attestation) {
if (isElectraAttestation(attestation)) {
return {
data: attestation.data,
// clone because it will be mutated
Expand All @@ -247,18 +241,5 @@ function attestationToAggregate(attestation: allForks.Attestation): AggregateFas
* Unwrap AggregateFast to phase0.Attestation
*/
function fastToAttestation(aggFast: AggregateFast): allForks.Attestation {
if ("committeeBits" in aggFast) {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
committeeBits: aggFast.committeeBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
} else {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
}
return {...aggFast, signature: aggFast.signature.toBytes(PointFormat.compressed)};
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function validateAggregateAndProof(
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NOT_EXACTLY_ONE_COMMITTEE_BIT_SET});
}
// [REJECT] aggregate.data.index == 0
if (attData.index === 0) {
if (attData.index !== 0) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX});
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type AttestationValidationResult = {
export type AttestationOrBytes = ApiAttestation | GossipAttestation;

/** attestation from api */
export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null}; // TODO Electra: add new attestation type
export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null};

/** attestation from gossip */
export type GossipAttestation = {
Expand Down Expand Up @@ -298,7 +298,7 @@ async function validateGossipAttestationNoSignatureCheck(
}

// [REJECT] aggregate.data.index == 0
if (attData.index === 0) {
if (attData.index !== 0) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX});
}
} else {
Expand Down
Loading

0 comments on commit 59f3abe

Please sign in to comment.