diff --git a/cl/aggregation/pool_impl.go b/cl/aggregation/pool_impl.go index b44e96bdcbd..aa3c97fecb7 100644 --- a/cl/aggregation/pool_impl.go +++ b/cl/aggregation/pool_impl.go @@ -61,7 +61,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error { return nil } - if utils.IsSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) { + if utils.IsNonStrictSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) { return ErrIsSuperset } diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index 381c3c00eb8..dcf70b756c6 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/Giulio2002/bls" "github.com/go-chi/chi/v5" "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -170,10 +169,6 @@ func (a *ApiHandler) produceBeaconBody(ctx context.Context, apiVersion int, base beaconBody.RandaoReveal = randaoReveal beaconBody.Graffiti = graffiti beaconBody.Version = stateVersion - // Sync aggregate is empty for now. - beaconBody.SyncAggregate = &cltypes.SyncAggregate{ - SyncCommiteeSignature: bls.InfiniteSignature, - } // Build execution payload latestExecutionPayload := baseState.LatestExecutionPayloadHeader() @@ -308,6 +303,15 @@ func (a *ApiHandler) produceBeaconBody(ctx context.Context, apiVersion int, base } } }() + // process the sync aggregate in parallel + wg.Add(1) + go func() { + defer wg.Done() + beaconBody.SyncAggregate, err = a.syncMessagePool.GetSyncAggregate(targetSlot-1, blockRoot) + if err != nil { + log.Error("BlockProduction: Failed to get sync aggregate", "err", err) + } + }() wg.Wait() if executionPayload == nil { return nil, 0, fmt.Errorf("failed to produce execution payload") diff --git a/cl/beacon/handler/utils_test.go b/cl/beacon/handler/utils_test.go index dd939d1a529..2073db99692 100644 --- a/cl/beacon/handler/utils_test.go +++ b/cl/beacon/handler/utils_test.go @@ -45,7 +45,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge bcfg.CapellaForkEpoch = 1 blocks, preState, postState = tests.GetCapellaRandom() } - fcu = forkchoice.NewForkChoiceStorageMock() + fcu = forkchoice.NewForkChoiceStorageMock(t) db = memdb.NewTestDB(t) blobDb := memdb.NewTestDB(t) var reader *tests.MockBlockReader diff --git a/cl/phase1/forkchoice/forkchoice_mock.go b/cl/phase1/forkchoice/forkchoice_mock.go index da7a78a53d5..57069c9727f 100644 --- a/cl/phase1/forkchoice/forkchoice_mock.go +++ b/cl/phase1/forkchoice/forkchoice_mock.go @@ -2,6 +2,7 @@ package forkchoice import ( "context" + "testing" "github.com/ledgerwatch/erigon-lib/common" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -12,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/cl/pool" "github.com/ledgerwatch/erigon/cl/transition/impl/eth2" "github.com/ledgerwatch/erigon/cl/validator/sync_contribution_pool" + "go.uber.org/mock/gomock" ) // Make mocks with maps and simple setters and getters, panic on methods from ForkChoiceStorageWriter @@ -47,7 +49,51 @@ type ForkChoiceStorageMock struct { Pool pool.OperationsPool } -func NewForkChoiceStorageMock() *ForkChoiceStorageMock { +func makeSyncContributionPoolMock(t *testing.T) sync_contribution_pool.SyncContributionPool { + ctrl := gomock.NewController(t) + type syncContributionKey struct { + slot uint64 + subcommitteeIndex uint64 + beaconBlockRoot common.Hash + } + u := map[syncContributionKey]*cltypes.Contribution{} + pool := sync_contribution_pool.NewMockSyncContributionPool(ctrl) + pool.EXPECT().AddSyncContribution(gomock.Any(), gomock.Any()).DoAndReturn(func(headState *state.CachingBeaconState, contribution *cltypes.Contribution) error { + key := syncContributionKey{ + slot: contribution.Slot, + subcommitteeIndex: contribution.SubcommitteeIndex, + beaconBlockRoot: contribution.BeaconBlockRoot, + } + u[key] = contribution + return nil + }).AnyTimes() + pool.EXPECT().GetSyncContribution(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(slot uint64, subcommitteeIndex uint64, beaconBlockRoot common.Hash) (*cltypes.Contribution, bool) { + key := syncContributionKey{ + slot: slot, + subcommitteeIndex: subcommitteeIndex, + beaconBlockRoot: beaconBlockRoot, + } + v, ok := u[key] + return v, ok + }).AnyTimes() + pool.EXPECT().AddSyncCommitteeMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(headState *state.CachingBeaconState, subCommitee uint64, message *cltypes.SyncCommitteeMessage) error { + key := syncContributionKey{ + slot: message.Slot, + subcommitteeIndex: subCommitee, + beaconBlockRoot: message.BeaconBlockRoot, + } + u[key] = &cltypes.Contribution{ + Slot: message.Slot, + SubcommitteeIndex: subCommitee, + BeaconBlockRoot: message.BeaconBlockRoot, + AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize), + } + return nil + }).AnyTimes() + return pool +} + +func NewForkChoiceStorageMock(t *testing.T) *ForkChoiceStorageMock { return &ForkChoiceStorageMock{ Ancestors: make(map[uint64]common.Hash), AnchorSlotVal: 0, @@ -66,9 +112,9 @@ func NewForkChoiceStorageMock() *ForkChoiceStorageMock { GetFinalityCheckpointsVal: make(map[common.Hash][3]solid.Checkpoint), LightClientBootstraps: make(map[common.Hash]*cltypes.LightClientBootstrap), LCUpdates: make(map[uint64]*cltypes.LightClientUpdate), - SyncContributionPool: sync_contribution_pool.NewSyncContributionPoolMock(), Headers: make(map[common.Hash]*cltypes.BeaconBlockHeader), GetBeaconCommitteeMock: nil, + SyncContributionPool: makeSyncContributionPoolMock(t), } } diff --git a/cl/phase1/network/services/aggregate_and_proof_service_test.go b/cl/phase1/network/services/aggregate_and_proof_service_test.go index b32df91d3f0..91cd37395be 100644 --- a/cl/phase1/network/services/aggregate_and_proof_service_test.go +++ b/cl/phase1/network/services/aggregate_and_proof_service_test.go @@ -43,7 +43,7 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced cn() cfg := &clparams.MainnetBeaconConfig syncedDataManager := synced_data.NewSyncedDataManager(true, cfg) - forkchoiceMock := forkchoice.NewForkChoiceStorageMock() + forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t) blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, nil, true) return blockService, syncedDataManager, forkchoiceMock } diff --git a/cl/phase1/network/services/blob_sidecar_service_test.go b/cl/phase1/network/services/blob_sidecar_service_test.go index 28f4d69c0d9..680832a089e 100644 --- a/cl/phase1/network/services/blob_sidecar_service_test.go +++ b/cl/phase1/network/services/blob_sidecar_service_test.go @@ -56,7 +56,7 @@ func setupBlobSidecarService(t *testing.T, ctrl *gomock.Controller, test bool) ( cfg := &clparams.MainnetBeaconConfig syncedDataManager := synced_data.NewSyncedDataManager(true, cfg) ethClock := eth_clock.NewMockEthereumClock(ctrl) - forkchoiceMock := forkchoice.NewForkChoiceStorageMock() + forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t) blockService := NewBlobSidecarService(ctx2, cfg, forkchoiceMock, syncedDataManager, ethClock, test) return blockService, syncedDataManager, ethClock, forkchoiceMock } diff --git a/cl/phase1/network/services/block_service_test.go b/cl/phase1/network/services/block_service_test.go index 6ef5f7dc02e..f0eeb2ae39a 100644 --- a/cl/phase1/network/services/block_service_test.go +++ b/cl/phase1/network/services/block_service_test.go @@ -21,7 +21,7 @@ func setupBlockService(t *testing.T, ctrl *gomock.Controller) (BlockService, *sy cfg := &clparams.MainnetBeaconConfig syncedDataManager := synced_data.NewSyncedDataManager(true, cfg) ethClock := eth_clock.NewMockEthereumClock(ctrl) - forkchoiceMock := forkchoice.NewForkChoiceStorageMock() + forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t) blockService := NewBlockService(context.Background(), db, forkchoiceMock, syncedDataManager, ethClock, cfg, nil) return blockService, syncedDataManager, ethClock, forkchoiceMock } diff --git a/cl/phase1/network/services/sync_committee_messages_service.go b/cl/phase1/network/services/sync_committee_messages_service.go index 66999bcc89b..eadcadf8543 100644 --- a/cl/phase1/network/services/sync_committee_messages_service.go +++ b/cl/phase1/network/services/sync_committee_messages_service.go @@ -99,7 +99,7 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne func (s *syncCommitteeMessagesService) cleanupOldSyncCommitteeMessages() { headSlot := s.syncedDataManager.HeadSlot() for k := range s.seenSyncCommitteeMessages { - if headSlot != k.slot { + if headSlot > k.slot+1 { delete(s.seenSyncCommitteeMessages, k) } } diff --git a/cl/sentinel/handlers/heartbeats_test.go b/cl/sentinel/handlers/heartbeats_test.go index 8428af926f5..17bc598dee1 100644 --- a/cl/sentinel/handlers/heartbeats_test.go +++ b/cl/sentinel/handlers/heartbeats_test.go @@ -68,7 +68,7 @@ func TestPing(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) ethClock := getEthClock(t) _, beaconCfg := clparams.GetConfigsByNetwork(1) @@ -123,7 +123,7 @@ func TestGoodbye(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) ethClock := getEthClock(t) _, beaconCfg := clparams.GetConfigsByNetwork(1) c := NewConsensusHandlers( @@ -183,7 +183,7 @@ func TestMetadataV2(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) ethClock := getEthClock(t) nc := clparams.NetworkConfigs[clparams.MainnetNetwork] _, beaconCfg := clparams.GetConfigsByNetwork(1) @@ -241,7 +241,7 @@ func TestMetadataV1(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) nc := clparams.NetworkConfigs[clparams.MainnetNetwork] ethClock := getEthClock(t) @@ -299,7 +299,7 @@ func TestStatus(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) hs := handshake.New(ctx, getEthClock(t), &clparams.MainnetBeaconConfig, nil) s := &cltypes.Status{ diff --git a/cl/sentinel/handlers/light_client_test.go b/cl/sentinel/handlers/light_client_test.go index 29c36ee5973..39a46209903 100644 --- a/cl/sentinel/handlers/light_client_test.go +++ b/cl/sentinel/handlers/light_client_test.go @@ -45,7 +45,7 @@ func TestLightClientOptimistic(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) f.NewestLCUpdate = &cltypes.LightClientUpdate{ AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion), @@ -115,7 +115,7 @@ func TestLightClientFinality(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) f.NewestLCUpdate = &cltypes.LightClientUpdate{ AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion), @@ -188,7 +188,7 @@ func TestLightClientBootstrap(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) f.NewestLCUpdate = &cltypes.LightClientUpdate{ AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion), @@ -270,7 +270,7 @@ func TestLightClientUpdates(t *testing.T) { peersPool := peers.NewPool() beaconDB, indiciesDB := setupStore(t) - f := forkchoice.NewForkChoiceStorageMock() + f := forkchoice.NewForkChoiceStorageMock(t) ethClock := getEthClock(t) up := &cltypes.LightClientUpdate{ diff --git a/cl/utils/bytes.go b/cl/utils/bytes.go index 66ce500c837..9284df09f93 100644 --- a/cl/utils/bytes.go +++ b/cl/utils/bytes.go @@ -121,18 +121,32 @@ func IsBitOn(b []byte, idx int) bool { return b[idx/8]&i == i } -func IsSupersetBitlist(a, b []byte) bool { +// IsNonStrictSupersetBitlist checks if bitlist 'a' is a non-strict superset of bitlist 'b' +func IsNonStrictSupersetBitlist(a, b []byte) bool { + // Ensure 'a' is at least as long as 'b' if len(a) < len(b) { return false } - for i := range b { - if a[i]&b[i] != b[i] { + + // Check each bit in 'b' to ensure it is also set in 'a' + for i := 0; i < len(b); i++ { + if (a[i] & b[i]) != b[i] { return false } } + + // If all bits required by 'b' are present in 'a', return true return true } +func BitsOnCount(b []byte) int { + count := 0 + for _, v := range b { + count += bits.OnesCount8(v) + } + return count +} + func MergeBitlists(a, b []byte) { for i := range b { a[i] |= b[i] diff --git a/cl/validator/sync_contribution_pool/interface.go b/cl/validator/sync_contribution_pool/interface.go index 2dcd4478eaa..6dc7f69fa50 100644 --- a/cl/validator/sync_contribution_pool/interface.go +++ b/cl/validator/sync_contribution_pool/interface.go @@ -18,4 +18,6 @@ type SyncContributionPool interface { // GetSyncContribution retrieves a sync contribution from the pool. GetSyncContribution(slot, subcommitteeIndex uint64, beaconBlockRoot common.Hash) *cltypes.Contribution + // Obtain the sync aggregate for the sync messages pointing to a given beacon block root. + GetSyncAggregate(slot uint64, beaconBlockRoot common.Hash) (*cltypes.SyncAggregate, error) } diff --git a/cl/validator/sync_contribution_pool/mock.go b/cl/validator/sync_contribution_pool/mock.go index 045dcb1dff1..be51057a910 100644 --- a/cl/validator/sync_contribution_pool/mock.go +++ b/cl/validator/sync_contribution_pool/mock.go @@ -69,6 +69,21 @@ func (mr *MockSyncContributionPoolMockRecorder) AddSyncContribution(headState, c return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSyncContribution", reflect.TypeOf((*MockSyncContributionPool)(nil).AddSyncContribution), headState, contribution) } +// GetSyncAggregate mocks base method. +func (m *MockSyncContributionPool) GetSyncAggregate(slot uint64, beaconBlockRoot common.Hash) (*cltypes.SyncAggregate, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSyncAggregate", slot, beaconBlockRoot) + ret0, _ := ret[0].(*cltypes.SyncAggregate) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSyncAggregate indicates an expected call of GetSyncAggregate. +func (mr *MockSyncContributionPoolMockRecorder) GetSyncAggregate(slot, beaconBlockRoot any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSyncAggregate", reflect.TypeOf((*MockSyncContributionPool)(nil).GetSyncAggregate), slot, beaconBlockRoot) +} + // GetSyncContribution mocks base method. func (m *MockSyncContributionPool) GetSyncContribution(slot, subcommitteeIndex uint64, beaconBlockRoot common.Hash) *cltypes.Contribution { m.ctrl.T.Helper() diff --git a/cl/validator/sync_contribution_pool/sync_contribution_pool.go b/cl/validator/sync_contribution_pool/sync_contribution_pool.go index a1ccca452fb..e3e59f13eb6 100644 --- a/cl/validator/sync_contribution_pool/sync_contribution_pool.go +++ b/cl/validator/sync_contribution_pool/sync_contribution_pool.go @@ -1,11 +1,13 @@ package sync_contribution_pool import ( + "bytes" "errors" "sync" "github.com/Giulio2002/bls" "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/phase1/core/state" @@ -20,16 +22,20 @@ type syncContributionKey struct { type syncContributionPoolImpl struct { // syncContributionPool is a map of sync contributions, indexed by slot, subcommittee index and block root. - syncContributionPool map[syncContributionKey]*cltypes.Contribution + syncContributionPoolForBlocks map[syncContributionKey]*cltypes.Contribution // Used for block publishing. + syncContributionPoolForAggregates map[syncContributionKey]*cltypes.Contribution // Used for sync committee messages aggregation. + beaconCfg *clparams.BeaconChainConfig mu sync.Mutex } var ErrIsSuperset = errors.New("sync contribution is a superset of existing attestation") -func NewSyncContributionPool() SyncContributionPool { +func NewSyncContributionPool(beaconCfg *clparams.BeaconChainConfig) SyncContributionPool { return &syncContributionPoolImpl{ - syncContributionPool: make(map[syncContributionKey]*cltypes.Contribution), + syncContributionPoolForBlocks: make(map[syncContributionKey]*cltypes.Contribution), + syncContributionPoolForAggregates: make(map[syncContributionKey]*cltypes.Contribution), + beaconCfg: beaconCfg, } } @@ -51,36 +57,16 @@ func (s *syncContributionPoolImpl) AddSyncContribution(headState *state.CachingB subcommitteeIndex: contribution.SubcommitteeIndex, beaconBlockRoot: contribution.BeaconBlockRoot, } - baseContribution := &cltypes.Contribution{ - Slot: contribution.Slot, - SubcommitteeIndex: contribution.SubcommitteeIndex, - BeaconBlockRoot: contribution.BeaconBlockRoot, - AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize), - Signature: bls.InfiniteSignature, - } - if val, ok := s.syncContributionPool[key]; ok { - baseContribution = val.Copy() - } - // Time to aggregate the giga aggregatable. - if utils.IsSupersetBitlist(baseContribution.AggregationBits, contribution.AggregationBits) { - return ErrIsSuperset // Skip it if it is just a superset. + baseContribution, ok := s.syncContributionPoolForBlocks[key] + if !ok { + s.syncContributionPoolForBlocks[key] = contribution.Copy() + return nil } - // Aggregate the bits. - utils.MergeBitlists(baseContribution.AggregationBits, contribution.AggregationBits) - // Aggregate the signature. - aggregatedSignature, err := bls.AggregateSignatures([][]byte{ - baseContribution.Signature[:], - contribution.Signature[:], - }) - if err != nil { - return err + if utils.BitsOnCount(baseContribution.AggregationBits) >= utils.BitsOnCount(contribution.AggregationBits) { + return ErrIsSuperset } - copy(baseContribution.Signature[:], aggregatedSignature) - - // Make a copy. - s.syncContributionPool[key] = baseContribution.Copy() - s.cleanupOldContributions(headState) + s.syncContributionPoolForBlocks[key] = contribution.Copy() return nil } @@ -94,7 +80,7 @@ func (s *syncContributionPoolImpl) GetSyncContribution(slot, subcommitteeIndex u beaconBlockRoot: beaconBlockRoot, } - contribution, ok := s.syncContributionPool[key] + contribution, ok := s.syncContributionPoolForAggregates[key] // this should be exposed to the outside world. through Beacon API. // Return a copies. if !ok { // if we dont have it return an empty contribution (no aggregation bits). @@ -110,10 +96,14 @@ func (s *syncContributionPoolImpl) GetSyncContribution(slot, subcommitteeIndex u } func (s *syncContributionPoolImpl) cleanupOldContributions(headState *state.CachingBeaconState) { - - for key := range s.syncContributionPool { + for key := range s.syncContributionPoolForAggregates { + if headState.Slot() != key.slot { + delete(s.syncContributionPoolForAggregates, key) + } + } + for key := range s.syncContributionPoolForBlocks { if headState.Slot() != key.slot { - delete(s.syncContributionPool, key) + delete(s.syncContributionPoolForBlocks, key) } } } @@ -132,7 +122,7 @@ func (s *syncContributionPoolImpl) AddSyncCommitteeMessage(headState *state.Cach } // We retrieve a base contribution - contribution, ok := s.syncContributionPool[key] + contribution, ok := s.syncContributionPoolForAggregates[key] if !ok { contribution = &cltypes.Contribution{ Slot: message.Slot, @@ -155,9 +145,13 @@ func (s *syncContributionPoolImpl) AddSyncCommitteeMessage(headState *state.Cach startSubCommittee := subCommittee * subCommitteeSize for i := startSubCommittee; i < startSubCommittee+subCommitteeSize; i++ { if committee[i] == publicKey { // turn on this bit + if utils.IsBitOn(contribution.AggregationBits, int(i-startSubCommittee)) { + return nil + } utils.FlipBitOn(contribution.AggregationBits, int(i-startSubCommittee)) } } + // Compute the aggregated signature. aggregatedSignature, err := bls.AggregateSignatures([][]byte{ contribution.Signature[:], @@ -167,7 +161,77 @@ func (s *syncContributionPoolImpl) AddSyncCommitteeMessage(headState *state.Cach return err } copy(contribution.Signature[:], aggregatedSignature) - s.syncContributionPool[key] = contribution + s.syncContributionPoolForAggregates[key] = contribution s.cleanupOldContributions(headState) return nil } + +// GetSyncAggregate computes and returns the sync aggregate for the sync messages pointing to a given beacon block root. +/* +def process_sync_committee_contributions(block: BeaconBlock, + contributions: Set[SyncCommitteeContribution]) -> None: + sync_aggregate = SyncAggregate() + signatures = [] + sync_subcommittee_size = SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT + + for contribution in contributions: + subcommittee_index = contribution.subcommittee_index + for index, participated in enumerate(contribution.aggregation_bits): + if participated: + participant_index = sync_subcommittee_size * subcommittee_index + index + sync_aggregate.sync_committee_bits[participant_index] = True + signatures.append(contribution.signature) + + sync_aggregate.sync_committee_signature = bls.Aggregate(signatures) + block.body.sync_aggregate = sync_aggregate +*/ +func (s *syncContributionPoolImpl) GetSyncAggregate(slot uint64, beaconBlockRoot common.Hash) (*cltypes.SyncAggregate, error) { + s.mu.Lock() + defer s.mu.Unlock() + // find all contributions for the given beacon block root. + contributions := []*cltypes.Contribution{} + for key, contribution := range s.syncContributionPoolForBlocks { + if key.beaconBlockRoot == beaconBlockRoot && slot == key.slot { + contributions = append(contributions, contribution) + } + } + if len(contributions) == 0 { + return &cltypes.SyncAggregate{ // return an empty aggregate. + SyncCommiteeSignature: bls.InfiniteSignature, + }, nil + } + aggregate := &cltypes.SyncAggregate{} + signatures := [][]byte{} + syncSubCommitteeSize := s.beaconCfg.SyncCommitteeSize / s.beaconCfg.SyncCommitteeSubnetCount + // triple for-loop for the win. + for _, contribution := range contributions { + if bytes.Equal(contribution.AggregationBits, make([]byte, cltypes.SyncCommitteeAggregationBitsSize)) { + continue + } + for i := range contribution.AggregationBits { + for j := 0; j < 8; j++ { + bitIndex := i*8 + j + partecipated := utils.IsBitOn(contribution.AggregationBits, bitIndex) + if partecipated { + participantIndex := syncSubCommitteeSize*contribution.SubcommitteeIndex + uint64(bitIndex) + utils.FlipBitOn(aggregate.SyncCommiteeBits[:], int(participantIndex)) + } + } + } + signatures = append(signatures, contribution.Signature[:]) + } + if len(signatures) == 0 { + return &cltypes.SyncAggregate{ // return an empty aggregate. + SyncCommiteeSignature: bls.InfiniteSignature, + }, nil + } + // Aggregate the signatures. + aggregateSignature, err := bls.AggregateSignatures(signatures) + if err != nil { + return &cltypes.SyncAggregate{ // return an empty aggregate. + SyncCommiteeSignature: bls.InfiniteSignature, + }, err + } + copy(aggregate.SyncCommiteeSignature[:], aggregateSignature) + return aggregate, nil +} diff --git a/cl/validator/sync_contribution_pool/sync_contribution_pool_mock.go b/cl/validator/sync_contribution_pool/sync_contribution_pool_mock.go deleted file mode 100644 index a268e4b4c53..00000000000 --- a/cl/validator/sync_contribution_pool/sync_contribution_pool_mock.go +++ /dev/null @@ -1,56 +0,0 @@ -package sync_contribution_pool - -import ( - "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon/cl/cltypes" - "github.com/ledgerwatch/erigon/cl/phase1/core/state" -) - -type syncContributionPoolMock struct { - // syncContributionPool is a map of sync contributions, indexed by slot, subcommittee index and block root. - syncContributionPool map[syncContributionKey]*cltypes.Contribution -} - -func NewSyncContributionPoolMock() SyncContributionPool { - return &syncContributionPoolMock{ - syncContributionPool: make(map[syncContributionKey]*cltypes.Contribution), - } -} - -// AddSyncContribution adds a sync committee contribution to the pool. -func (s *syncContributionPoolMock) AddSyncContribution(headState *state.CachingBeaconState, contribution *cltypes.Contribution) error { - - key := syncContributionKey{ - slot: contribution.Slot, - subcommitteeIndex: contribution.SubcommitteeIndex, - beaconBlockRoot: contribution.BeaconBlockRoot, - } - s.syncContributionPool[key] = contribution - return nil -} - -// AddSyncCommitteeMessage aggretates a sync committee message to a contribution to the pool. -func (s *syncContributionPoolMock) AddSyncCommitteeMessage(headState *state.CachingBeaconState, subCommitee uint64, message *cltypes.SyncCommitteeMessage) error { - key := syncContributionKey{ - slot: message.Slot, - subcommitteeIndex: subCommitee, - beaconBlockRoot: message.BeaconBlockRoot, - } - s.syncContributionPool[key] = &cltypes.Contribution{ - Slot: message.Slot, - SubcommitteeIndex: subCommitee, - BeaconBlockRoot: message.BeaconBlockRoot, - AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize), - } - return nil -} - -// GetSyncContribution retrieves a sync contribution from the pool. -func (s *syncContributionPoolMock) GetSyncContribution(slot, subcommitteeIndex uint64, beaconBlockRoot common.Hash) *cltypes.Contribution { - key := syncContributionKey{ - slot: slot, - subcommitteeIndex: subcommitteeIndex, - beaconBlockRoot: beaconBlockRoot, - } - return s.syncContributionPool[key] -} diff --git a/cl/validator/sync_contribution_pool/sync_contribution_pool_test.go b/cl/validator/sync_contribution_pool/sync_contribution_pool_test.go index 1dbabd0ecfd..7def33ed56c 100644 --- a/cl/validator/sync_contribution_pool/sync_contribution_pool_test.go +++ b/cl/validator/sync_contribution_pool/sync_contribution_pool_test.go @@ -49,7 +49,7 @@ func getTestCommitteesMessages(n int) (privateKeys [][]byte, messages []cltypes. func TestSyncContributionPool(t *testing.T) { _, msgs, s := getTestCommitteesMessages(16) - pool := NewSyncContributionPool() + pool := NewSyncContributionPool(&clparams.MainnetBeaconConfig) require.NoError(t, pool.AddSyncCommitteeMessage(s, 0, &msgs[0])) require.NoError(t, pool.AddSyncCommitteeMessage(s, 0, &msgs[1])) require.NoError(t, pool.AddSyncCommitteeMessage(s, 0, &msgs[2])) @@ -59,9 +59,4 @@ func TestSyncContributionPool(t *testing.T) { contribution.SubcommitteeIndex = 1 require.NoError(t, pool.AddSyncContribution(s, contribution)) - - contribution2 := pool.GetSyncContribution(0, 1, testHash) - require.Equal(t, contribution2.Signature, contribution.Signature) - require.Equal(t, contribution, contribution2) - } diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 7da2c1cc017..f0efb639aca 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -127,7 +127,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin fcuFs := afero.NewBasePathFs(afero.NewOsFs(), caplinFcuPath) syncedDataManager := synced_data.NewSyncedDataManager(true, beaconConfig) - syncContributionPool := sync_contribution_pool.NewSyncContributionPool() + syncContributionPool := sync_contribution_pool.NewSyncContributionPool(beaconConfig) emitters := beaconevents.NewEmitters() aggregationPool := aggregation.NewAggregationPool(ctx, beaconConfig, networkConfig, ethClock) forkChoice, err := forkchoice.NewForkChoiceStore(ethClock, state, engine, pool, fork_graph.NewForkGraphDisk(state, fcuFs, config.BeaconRouter), emitters, syncedDataManager, blobStorage) diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index 469ffd1b61b..0b24b7be268 100644 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -20,7 +20,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common/mem" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/phase1/core" - "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/cl/sentinel" "github.com/ledgerwatch/erigon/cl/sentinel/service" "github.com/ledgerwatch/erigon/cl/utils/eth_clock" @@ -69,7 +68,7 @@ func runSentinelNode(cliCtx *cli.Context) error { NoDiscovery: cfg.NoDiscovery, LocalDiscovery: cfg.LocalDiscovery, EnableBlocks: false, - }, nil, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), cfg.BeaconCfg), forkchoice.NewForkChoiceStorageMock(), log.Root()) + }, nil, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, eth_clock.NewEthereumClock(bs.GenesisTime(), bs.GenesisValidatorsRoot(), cfg.BeaconCfg), nil, log.Root()) if err != nil { log.Error("[Sentinel] Could not start sentinel", "err", err) return err