Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EIP-7549: attestation pool #14121

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ go_library(
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/attestation/aggregation/attestations:go_default_library",
"//runtime/version:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
Expand All @@ -39,14 +40,15 @@ go_test(
embed = [":go_default_library"],
deps = [
"//config/fieldparams:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
112 changes: 78 additions & 34 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand All @@ -32,28 +34,28 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
_, span := trace.StartSpan(ctx, "operations.attestations.kv.aggregateUnaggregatedAtts")
defer span.End()

attsByDataRoot := make(map[[32]byte][]ethpb.Att, len(unaggregatedAtts))
attsByVerAndDataRoot := make(map[attestation.Id][]ethpb.Att, len(unaggregatedAtts))
for _, att := range unaggregatedAtts {
attDataRoot, err := att.GetData().HashTreeRoot()
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return err
return errors.Wrap(err, "could not create attestation ID")
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
attsByVerAndDataRoot[id] = append(attsByVerAndDataRoot[id], att)
}

// Aggregate unaggregated attestations from the pool and save them in the pool.
// Track the unaggregated attestations that aren't able to aggregate.
leftOverUnaggregatedAtt := make(map[[32]byte]bool)
leftOverUnaggregatedAtt := make(map[attestation.Id]bool)

leftOverUnaggregatedAtt = c.aggregateParallel(attsByDataRoot, leftOverUnaggregatedAtt)
leftOverUnaggregatedAtt = c.aggregateParallel(attsByVerAndDataRoot, leftOverUnaggregatedAtt)

// Remove the unaggregated attestations from the pool that were successfully aggregated.
for _, att := range unaggregatedAtts {
h, err := hashFn(att)
id, err := attestation.NewId(att, attestation.Full)
if err != nil {
return err
return errors.Wrap(err, "could not create attestation ID")
}
if leftOverUnaggregatedAtt[h] {
if leftOverUnaggregatedAtt[id] {
continue
}
if err := c.DeleteUnaggregatedAttestation(att); err != nil {
Expand All @@ -66,7 +68,7 @@ func (c *AttCaches) aggregateUnaggregatedAtts(ctx context.Context, unaggregatedA
// aggregateParallel aggregates attestations in parallel for `atts` and saves them in the pool,
// returns the unaggregated attestations that weren't able to aggregate.
// Given `n` CPU cores, it creates a channel of size `n` and spawns `n` goroutines to aggregate attestations
func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver map[[32]byte]bool) map[[32]byte]bool {
func (c *AttCaches) aggregateParallel(atts map[attestation.Id][]ethpb.Att, leftOver map[attestation.Id]bool) map[attestation.Id]bool {
var leftoverLock sync.Mutex
wg := sync.WaitGroup{}

Expand All @@ -92,13 +94,13 @@ func (c *AttCaches) aggregateParallel(atts map[[32]byte][]ethpb.Att, leftOver ma
continue
}
} else {
h, err := hashFn(aggregated)
id, err := attestation.NewId(aggregated, attestation.Full)
if err != nil {
log.WithError(err).Error("could not hash attestation")
log.WithError(err).Error("Could not create attestation ID")
continue
}
leftoverLock.Lock()
leftOver[h] = true
leftOver[id] = true
leftoverLock.Unlock()
}
}
Expand Down Expand Up @@ -139,25 +141,26 @@ func (c *AttCaches) SaveAggregatedAttestation(att ethpb.Att) error {
return nil
}

r, err := hashFn(att.GetData())
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
return errors.Wrap(err, "could not create attestation ID")
}
copiedAtt := att.Copy()

c.aggregatedAttLock.Lock()
defer c.aggregatedAttLock.Unlock()
atts, ok := c.aggregatedAtt[r]
atts, ok := c.aggregatedAtt[id]
if !ok {
atts := []ethpb.Att{copiedAtt}
c.aggregatedAtt[r] = atts
c.aggregatedAtt[id] = atts
return nil
}

atts, err = attaggregation.Aggregate(append(atts, copiedAtt))
if err != nil {
return err
}
c.aggregatedAtt[r] = atts
c.aggregatedAtt[id] = atts

return nil
}
Expand Down Expand Up @@ -191,17 +194,56 @@ func (c *AttCaches) AggregatedAttestations() []ethpb.Att {

// AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []ethpb.Att {
func (c *AttCaches) AggregatedAttestationsBySlotIndex(
ctx context.Context,
slot primitives.Slot,
committeeIndex primitives.CommitteeIndex,
) []*ethpb.Attestation {
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndex")
defer span.End()

atts := make([]ethpb.Att, 0)
atts := make([]*ethpb.Attestation, 0)

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
for _, a := range c.aggregatedAtt {
if slot == a[0].GetData().Slot && committeeIndex == a[0].GetData().CommitteeIndex {
atts = append(atts, a...)
for _, as := range c.aggregatedAtt {
if as[0].Version() == version.Phase0 && slot == as[0].GetData().Slot && committeeIndex == as[0].GetData().CommitteeIndex {
for _, a := range as {
att, ok := a.(*ethpb.Attestation)
// This will never fail in practice because we asserted the version
if ok {
atts = append(atts, att)
}
}
}
}

return atts
}

// AggregatedAttestationsBySlotIndexElectra returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndexElectra(
ctx context.Context,
slot primitives.Slot,
committeeIndex primitives.CommitteeIndex,
) []*ethpb.AttestationElectra {
_, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndexElectra")
defer span.End()

atts := make([]*ethpb.AttestationElectra, 0)

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
for _, as := range c.aggregatedAtt {
if as[0].Version() == version.Electra && slot == as[0].GetData().Slot && as[0].CommitteeBitsVal().BitAt(uint64(committeeIndex)) {
for _, a := range as {
att, ok := a.(*ethpb.AttestationElectra)
// This will never fail in practice because we asserted the version
if ok {
atts = append(atts, att)
}
}
}
}

Expand All @@ -216,18 +258,19 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
if !helpers.IsAggregated(att) {
return errors.New("attestation is not aggregated")
}
r, err := hashFn(att.GetData())
if err != nil {
return errors.Wrap(err, "could not tree hash attestation data")
}

if err := c.insertSeenBit(att); err != nil {
return err
}

id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return errors.Wrap(err, "could not create attestation ID")
}

c.aggregatedAttLock.Lock()
defer c.aggregatedAttLock.Unlock()
attList, ok := c.aggregatedAtt[r]
attList, ok := c.aggregatedAtt[id]
if !ok {
return nil
}
Expand All @@ -241,9 +284,9 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
}
}
if len(filtered) == 0 {
delete(c.aggregatedAtt, r)
delete(c.aggregatedAtt, id)
} else {
c.aggregatedAtt[r] = filtered
c.aggregatedAtt[id] = filtered
}

return nil
Expand All @@ -254,14 +297,15 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {
if err := helpers.ValidateNilAttestation(att); err != nil {
return false, err
}
r, err := hashFn(att.GetData())

id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return false, errors.Wrap(err, "could not tree hash attestation")
return false, errors.Wrap(err, "could not create attestation ID")
}

c.aggregatedAttLock.RLock()
defer c.aggregatedAttLock.RUnlock()
if atts, ok := c.aggregatedAtt[r]; ok {
if atts, ok := c.aggregatedAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
Expand All @@ -273,7 +317,7 @@ func (c *AttCaches) HasAggregatedAttestation(att ethpb.Att) (bool, error) {

c.blockAttLock.RLock()
defer c.blockAttLock.RUnlock()
if atts, ok := c.blockAtt[r]; ok {
if atts, ok := c.blockAtt[id]; ok {
for _, a := range atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return false, err
Expand Down
59 changes: 52 additions & 7 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

c "github.com/patrickmn/go-cache"
"github.com/pkg/errors"
fssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
}),
AggregationBits: bitfield.Bitlist{0b10111},
},
wantErrString: "could not tree hash attestation: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")",
wantErrString: "could not create attestation ID",
},
{
name: "already seen",
Expand All @@ -92,15 +93,13 @@ func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
count: 1,
},
}
r, err := hashFn(util.HydrateAttestationData(&ethpb.AttestationData{
Slot: 100,
}))
id, err := attestation.NewId(util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 100}}), attestation.Data)
require.NoError(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewAttCaches()
cache.seenAtt.Set(string(r[:]), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0xff}}, c.DefaultExpiration)
assert.Equal(t, 0, len(cache.unAggregatedAtt), "Invalid start pool, atts: %d", len(cache.unAggregatedAtt))

err := cache.SaveAggregatedAttestation(tt.att)
Expand Down Expand Up @@ -230,7 +229,7 @@ func TestKV_Aggregated_DeleteAggregatedAttestation(t *testing.T) {
},
}
err := cache.DeleteAggregatedAttestation(att)
wantErr := "could not tree hash attestation data: --.BeaconBlockRoot (" + fssz.ErrBytesLength.Error() + ")"
wantErr := "could not create attestation ID"
assert.ErrorContains(t, wantErr, err)
})

Expand Down Expand Up @@ -500,3 +499,49 @@ func TestKV_Aggregated_DuplicateAggregatedAttestations(t *testing.T) {
assert.DeepSSZEqual(t, att2, returned[0], "Did not receive correct aggregated atts")
assert.Equal(t, 1, len(returned), "Did not receive correct aggregated atts")
}

func TestKV_Aggregated_AggregatedAttestationsBySlotIndex(t *testing.T) {
cache := NewAttCaches()

att1 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1011}})
att2 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b1101}})
att3 := util.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b1101}})
atts := []*ethpb.Attestation{att1, att2, att3}

for _, att := range atts {
require.NoError(t, cache.SaveAggregatedAttestation(att))
}
ctx := context.Background()
returned := cache.AggregatedAttestationsBySlotIndex(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
returned = cache.AggregatedAttestationsBySlotIndex(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
}

func TestKV_Aggregated_AggregatedAttestationsBySlotIndexElectra(t *testing.T) {
cache := NewAttCaches()

committeeBits := primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(1, true)
att1 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1011}, CommitteeBits: committeeBits})
committeeBits = primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(2, true)
att2 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
committeeBits = primitives.NewAttestationCommitteeBits()
committeeBits.SetBitAt(1, true)
att3 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}, CommitteeBits: committeeBits})
atts := []*ethpb.AttestationElectra{att1, att2, att3}

for _, att := range atts {
require.NoError(t, cache.SaveAggregatedAttestation(att))
}
ctx := context.Background()
returned := cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att1}, returned)
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att2}, returned)
returned = cache.AggregatedAttestationsBySlotIndexElectra(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.AttestationElectra{att3}, returned)
}
Loading
Loading