Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: beacon committee subscription api #9721

Merged
merged 40 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6947535
api handler & type
domiwei Mar 15, 2024
9deec09
gossip & sentinel & attestation store
domiwei Mar 15, 2024
b39d20a
rename
domiwei Mar 15, 2024
0b30062
comment
domiwei Mar 15, 2024
1605488
udpate
domiwei Mar 20, 2024
9cc6adf
unneccesary func
domiwei Mar 21, 2024
f268752
publish data
domiwei Mar 21, 2024
833b433
on receive att
domiwei Mar 21, 2024
89705ac
update
domiwei Mar 22, 2024
2e58773
subscription done
domiwei Mar 22, 2024
87c38ad
mv pkg
domiwei Mar 22, 2024
b201adb
error handle
domiwei Mar 22, 2024
05bcbdc
update
domiwei Mar 25, 2024
75881f0
update
domiwei Mar 25, 2024
842dfce
udpate
domiwei Mar 25, 2024
a40a571
update
domiwei Mar 25, 2024
7397641
update
domiwei Mar 25, 2024
99502e5
aggregation pool
domiwei Mar 27, 2024
28a2b95
refactor
domiwei Mar 27, 2024
2e4ec86
aggr pool with test
domiwei Mar 27, 2024
b6f2603
sweeper
domiwei Mar 27, 2024
bafda80
update
domiwei Mar 27, 2024
f060135
move
domiwei Mar 27, 2024
ecb0dea
lint
domiwei Mar 27, 2024
8818ab3
synced data mgr
domiwei Mar 29, 2024
f73503e
update
domiwei Mar 29, 2024
a79805e
mock pool
domiwei Mar 29, 2024
0662704
update aggregation
domiwei Apr 1, 2024
34d06ce
conflict
domiwei Apr 1, 2024
8968c8b
udpate test
domiwei Apr 1, 2024
c36c11f
move check to forkstore
domiwei Apr 1, 2024
495416d
update
domiwei Apr 1, 2024
d10cb07
update
domiwei Apr 1, 2024
d7c21ed
add to aggr pool in OnAggregateAndProof
domiwei Apr 1, 2024
3a61f4c
update
domiwei Apr 1, 2024
73d66e4
update
domiwei Apr 1, 2024
bfa0f88
update
domiwei Apr 1, 2024
0d9aebc
stupid err
domiwei Apr 2, 2024
1bd8fbb
update
domiwei Apr 4, 2024
da6f9bc
update
domiwei Apr 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions cl/aggregation/mocks/AggregationPool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions cl/aggregation/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package aggregation

import "github.com/ledgerwatch/erigon/cl/cltypes/solid"

type AggregationPool interface {
AddAttestation(att *solid.Attestation) error
//GetAggregatations(slot uint64, committeeIndex uint64) ([]*solid.Attestation, error)
GetAggregatationByRoot(root [32]byte) *solid.Attestation
domiwei marked this conversation as resolved.
Show resolved Hide resolved
}
229 changes: 229 additions & 0 deletions cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package aggregation

import (
"context"
"fmt"
"sync"
"time"

"github.com/Giulio2002/bls"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/utils"
)

var (
blsAggregate = bls.AggregateSignatures
)

type aggregationPoolImpl struct {
// config
genesisConfig *clparams.GenesisConfig
beaconConfig *clparams.BeaconChainConfig
netConfig *clparams.NetworkConfig
aggregatesLock sync.RWMutex
aggregates map[[32]byte][]*attestation
}

type attestation struct {
bitCount int
att *solid.Attestation
}

func NewAggregationPool(
ctx context.Context,
genesisConfig *clparams.GenesisConfig,
beaconConfig *clparams.BeaconChainConfig,
netConfig *clparams.NetworkConfig,
) AggregationPool {
p := &aggregationPoolImpl{
genesisConfig: genesisConfig,
beaconConfig: beaconConfig,
netConfig: netConfig,
aggregatesLock: sync.RWMutex{},
aggregates: make(map[[32]byte][]*attestation),
}
go p.sweepStaleAtt(ctx)
return p
}

func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
// use hash of attestation data as key
hashRoot, err := inAtt.AttestantionData().HashSSZ()
if err != nil {
return err
}

p.aggregatesLock.Lock()
defer p.aggregatesLock.Unlock()
if _, ok := p.aggregates[hashRoot]; !ok {
p.aggregates[hashRoot] = []*attestation{
{
bitCount: countBit(inAtt),
att: inAtt,
},
}
return nil
}

// NOTE: naive merge attestation for each existing attestation in the pool,
// but it's not optimal. it's kind of a maximum coverage problem.
mergeCount := 0
alreadyContain := false
after := []*attestation{}
for _, curAtt := range p.aggregates[hashRoot] {
if isSubset(curAtt.att.AggregationBits(), inAtt.AggregationBits()) {
// in this case, the new attestation is already contained in the existing attestation, so do not need
// to add it again no matter it's merged or not.
alreadyContain = true
after = append(after, curAtt)
continue
}

if overlap, err := checkOverlap(curAtt.att.AggregationBits(), inAtt.AggregationBits()); err != nil {
return err
} else if overlap {
// do nothing but just append the original attestation
after = append(after, curAtt)
} else {
// merge attestation
mergedAtt, err := mergeAttestationNoOverlap(inAtt, curAtt.att)
if err != nil {
return err
}
after = append(after, &attestation{
bitCount: countBit(mergedAtt),
att: mergedAtt,
})
mergeCount++
}
}
if mergeCount == 0 && !alreadyContain {
// no merge and no contain, add new attestation
after = append(after, &attestation{
bitCount: countBit(inAtt),
att: inAtt,
})
}
p.aggregates[hashRoot] = after
return nil
}

func mergeAttestationNoOverlap(a, b *solid.Attestation) (*solid.Attestation, error) {
// merge bit
newBits := make([]byte, len(a.AggregationBits()))
for i := range a.AggregationBits() {
newBits[i] = a.AggregationBits()[i] | b.AggregationBits()[i]
}
// merge sig
aSig := a.Signature()
bSig := b.Signature()
sig1 := make([]byte, len(aSig))
sig2 := make([]byte, len(bSig))
copy(sig1, aSig[:])
copy(sig2, bSig[:])
mergedSig, err := blsAggregate([][]byte{sig1, sig2})
domiwei marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if len(mergedSig) > 96 {
return nil, fmt.Errorf("merged signature is too long")
}
var mergedResult [96]byte
copy(mergedResult[:], mergedSig)
merge := solid.NewAttestionFromParameters(
newBits,
a.AttestantionData(),
mergedResult,
)
return merge, nil
}

func isSubset(a, b []byte) bool {
domiwei marked this conversation as resolved.
Show resolved Hide resolved
if len(a) != len(b) {
return false
}
count := 0
for i := range a {
if a[i]&b[i] == b[i] {
count++
}
}
return count == len(b)
}

func checkOverlap(a, b []byte) (bool, error) {
domiwei marked this conversation as resolved.
Show resolved Hide resolved
if len(a) != len(b) {
return false, fmt.Errorf("different lengths")
}
for i := range a {
if a[i]&b[i] != 0 {
return true, nil
}
}
return false, nil
}

func countBit(att *solid.Attestation) int {
count := 0
for _, b := range att.AggregationBits() {
for i := 0; i < 8; i++ {
count += int(b >> i & 1)
}
}
return count
}

func (p *aggregationPoolImpl) GetAggregatationByRoot(root [32]byte) *solid.Attestation {
p.aggregatesLock.RLock()
defer p.aggregatesLock.RUnlock()
atts, ok := p.aggregates[root]
if !ok || atts == nil {
return nil
}

// find the attestation with the most bits set
maxBits := 0
var maxAtt *solid.Attestation
for _, att := range atts {
if att.bitCount > maxBits {
maxBits = att.bitCount
maxAtt = att.att
}
}
return maxAtt
}

func (p *aggregationPoolImpl) sweepStaleAtt(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.aggregatesLock.Lock()
toRemoves := make([][32]byte, 0)
for hashRoot := range p.aggregates {
if len(p.aggregates[hashRoot]) == 0 {
toRemoves = append(toRemoves, hashRoot)
continue
} else {
slot := p.aggregates[hashRoot][0].att.AttestantionData().Slot()
if p.slotIsStale(slot) {
toRemoves = append(toRemoves, hashRoot)
}
}
}
// remove stale attestation
for _, hashRoot := range toRemoves {
delete(p.aggregates, hashRoot)
}
p.aggregatesLock.Unlock()
}
}
}

func (p *aggregationPoolImpl) slotIsStale(targetSlot uint64) bool {
curSlot := utils.GetCurrentSlot(p.genesisConfig.GenesisTime, p.beaconConfig.SecondsPerSlot)
return curSlot-targetSlot > p.netConfig.AttestationPropagationSlotRange
}
Loading
Loading