Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized attestation processing #10020

Merged
merged 12 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func (a *ApiHandler) GetEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
return newBeaconResponse(a.operationsPool.VoluntaryExitPool.Raw()), nil
return newBeaconResponse(a.operationsPool.VoluntaryExitsPool.Raw()), nil
}

func (a *ApiHandler) GetEthV1BeaconPoolAttesterSlashings(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolVoluntaryExits(w http.ResponseWriter, r
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
a.operationsPool.VoluntaryExitPool.Insert(req.VoluntaryExit.ValidatorIndex, &req)
a.operationsPool.VoluntaryExitsPool.Insert(req.VoluntaryExit.ValidatorIndex, &req)
}
// Only write 200
w.WriteHeader(http.StatusOK)
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
return nil
}).AnyTimes()
voluntaryExitService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedVoluntaryExit) error {
opPool.VoluntaryExitPool.Insert(msg.VoluntaryExit.ValidatorIndex, msg)
opPool.VoluntaryExitsPool.Insert(msg.VoluntaryExit.ValidatorIndex, msg)
return nil
}).AnyTimes()
blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedBLSToExecutionChange) error {
Expand Down
72 changes: 53 additions & 19 deletions cl/phase1/forkchoice/on_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@ package forkchoice

import (
"fmt"
"time"

"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"

libcommon "github.com/ledgerwatch/erigon-lib/common"
)

const (
maxAttestationJobLifetime = 30 * time.Minute
maxBlockJobLifetime = 36 * time.Second // 3 mainnet slots
)

var (
ErrIgnore = fmt.Errorf("ignore")
)
Expand Down Expand Up @@ -42,42 +36,82 @@ func (f *ForkChoiceStore) OnAttestation(attestation *solid.Attestation, fromBloc
return err
}
}
headState := f.syncedDataManager.HeadState()
var attestationIndicies []uint64
var err error
target := data.Target()

if headState == nil {
attestationIndicies, err = f.verifyAttestationWithCheckpointState(target, attestation, fromBlock)
} else {
attestationIndicies, err = f.verifyAttestationWithState(headState, attestation, fromBlock)
}
if err != nil {
return err
}

// Lastly update latest messages.
f.processAttestingIndicies(attestation, attestationIndicies)

return nil
}

func (f *ForkChoiceStore) verifyAttestationWithCheckpointState(target solid.Checkpoint, attestation *solid.Attestation, fromBlock bool) (attestationIndicies []uint64, err error) {
data := attestation.AttestantionData()
targetState, err := f.getCheckpointState(target)
if err != nil {
return nil
return nil, err
}
// Verify attestation signature.
if targetState == nil {
return fmt.Errorf("target state does not exist")
return nil, fmt.Errorf("target state does not exist")
}
// Now we need to find the attesting indicies.
attestationIndicies, err := targetState.getAttestingIndicies(&data, attestation.AggregationBits())
attestationIndicies, err = targetState.getAttestingIndicies(&data, attestation.AggregationBits())
if err != nil {
return err
return nil, err
}
if !fromBlock {
indexedAttestation := state.GetIndexedAttestation(attestation, attestationIndicies)
if err != nil {
return err
return nil, err
}

valid, err := targetState.isValidIndexedAttestation(indexedAttestation)
if err != nil {
return err
return nil, err
}
if !valid {
return fmt.Errorf("invalid attestation")
return nil, fmt.Errorf("invalid attestation")
}
}
// Lastly update latest messages.
f.processAttestingIndicies(attestation, attestationIndicies)
if !fromBlock && insert {
// Add to the pool when verified.
f.operationsPool.AttestationsPool.Insert(attestation.Signature(), attestation)
return attestationIndicies, nil
}

func (f *ForkChoiceStore) verifyAttestationWithState(s *state.CachingBeaconState, attestation *solid.Attestation, fromBlock bool) (attestationIndicies []uint64, err error) {
data := attestation.AttestantionData()
if err != nil {
return nil, err
}
return nil

attestationIndicies, err = s.GetAttestingIndicies(data, attestation.AggregationBits(), true)
if err != nil {
return nil, err
}
if !fromBlock {
indexedAttestation := state.GetIndexedAttestation(attestation, attestationIndicies)
if err != nil {
return nil, err
}
valid, err := state.IsValidIndexedAttestation(s, indexedAttestation)
if err != nil {
return nil, err
}
if !valid {
return nil, fmt.Errorf("invalid attestation")
}
}
return attestationIndicies, nil
}

func (f *ForkChoiceStore) setLatestMessage(index uint64, message LatestMessage) {
Expand Down
14 changes: 9 additions & 5 deletions cl/phase1/forkchoice/on_attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ func (f *ForkChoiceStore) OnAttesterSlashing(attesterSlashing *cltypes.AttesterS
if !cltypes.IsSlashableAttestationData(attestation1.Data, attestation2.Data) {
return fmt.Errorf("attestation data is not slashable")
}
// Retrieve justified state
s, err := f.forkGraph.GetState(f.justifiedCheckpoint.Load().(solid.Checkpoint).BlockRoot(), false)
if err != nil {
return err
var err error
s := f.syncedDataManager.HeadState()
if s == nil {
// Retrieve justified state
s, err = f.forkGraph.GetState(f.justifiedCheckpoint.Load().(solid.Checkpoint).BlockRoot(), false)
if err != nil {
return err
}
}
if s == nil {
return fmt.Errorf("justified checkpoint state not accessible")
return fmt.Errorf("no state accessible")
}
attestation1PublicKeys, err := getIndexedAttestationPublicKeys(s, attestation1)
if err != nil {
Expand Down
14 changes: 11 additions & 3 deletions cl/phase1/network/services/block_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,36 @@ func (b *blockService) processAndStoreBlock(ctx context.Context, block *cltypes.
if err := b.forkchoiceStore.OnBlock(ctx, block, true, true, true); err != nil {
return err
}
go b.importBlockAttestations(block)
go b.importBlockOperations(block)
return b.db.Update(ctx, func(tx kv.RwTx) error {
return beacon_indicies.WriteHighestFinalized(tx, b.forkchoiceStore.FinalizedSlot())
})

}

// importBlockAttestationsInParallel imports block attestations in parallel
func (b *blockService) importBlockAttestations(block *cltypes.SignedBeaconBlock) {
// importBlockOperations imports block operations in parallel
func (b *blockService) importBlockOperations(block *cltypes.SignedBeaconBlock) {
defer func() { // Would prefer this not to crash but rather log the error
r := recover()
if r != nil {
log.Warn("recovered from panic", "err", r)
}
}()
start := time.Now()
block.Block.Body.Attestations.Range(func(idx int, a *solid.Attestation, total int) bool {
if err := b.forkchoiceStore.OnAttestation(a, true, false); err != nil {
log.Debug("bad attestation received", "err", err)
}

return true
})
block.Block.Body.AttesterSlashings.Range(func(idx int, a *cltypes.AttesterSlashing, total int) bool {
if err := b.forkchoiceStore.OnAttesterSlashing(a, false); err != nil {
log.Debug("bad attester slashing received", "err", err)
}
return true
})
log.Debug("import operations", "time", time.Since(start))
}

// loop is the main loop of the block service
Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/network/services/voluntary_exit_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *voluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint6
defer s.emitters.Publish("voluntary_exit", voluntaryExit)

// [IGNORE] The voluntary exit is the first valid voluntary exit received for the validator with index signed_voluntary_exit.message.validator_index.
if s.operationsPool.VoluntaryExitPool.Has(voluntaryExit.ValidatorIndex) {
if s.operationsPool.VoluntaryExitsPool.Has(voluntaryExit.ValidatorIndex) {
return ErrIgnore
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *voluntaryExitService) ProcessMessage(ctx context.Context, subnet *uint6
return errors.New("ProcessVoluntaryExit: BLS verification failed")
}

s.operationsPool.VoluntaryExitPool.Insert(voluntaryExit.ValidatorIndex, msg)
s.operationsPool.VoluntaryExitsPool.Insert(voluntaryExit.ValidatorIndex, msg)

return nil
}
28 changes: 13 additions & 15 deletions cl/pool/operations_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
)

const operationsPerPool = 512

// DoubleSignatureKey uses blake2b algorithm to merge two signatures together. blake2 is faster than sha3.
func doubleSignatureKey(one, two libcommon.Bytes96) (out libcommon.Bytes96) {
res := blake2b.Sum256(append(one[:], two[:]...))
Expand All @@ -25,30 +27,26 @@ func ComputeKeyForAttesterSlashing(slashing *cltypes.AttesterSlashing) libcommon

// OperationsPool is the collection of all gossip-collectable operations.
type OperationsPool struct {
AttestationsPool *OperationPool[libcommon.Bytes96, *solid.Attestation]
AttesterSlashingsPool *OperationPool[libcommon.Bytes96, *cltypes.AttesterSlashing]
ProposerSlashingsPool *OperationPool[libcommon.Bytes96, *cltypes.ProposerSlashing]
BLSToExecutionChangesPool *OperationPool[libcommon.Bytes96, *cltypes.SignedBLSToExecutionChange]
SignedContributionAndProofPool *OperationPool[libcommon.Bytes96, *cltypes.SignedContributionAndProof]
VoluntaryExitPool *OperationPool[uint64, *cltypes.SignedVoluntaryExit]
ContributionCache *OperationPool[cltypes.ContributionKey, [][]byte]
AttestationsPool *OperationPool[libcommon.Bytes96, *solid.Attestation]
AttesterSlashingsPool *OperationPool[libcommon.Bytes96, *cltypes.AttesterSlashing]
ProposerSlashingsPool *OperationPool[libcommon.Bytes96, *cltypes.ProposerSlashing]
BLSToExecutionChangesPool *OperationPool[libcommon.Bytes96, *cltypes.SignedBLSToExecutionChange]
VoluntaryExitsPool *OperationPool[uint64, *cltypes.SignedVoluntaryExit]
}

func NewOperationsPool(beaconCfg *clparams.BeaconChainConfig) OperationsPool {
return OperationsPool{
AttestationsPool: NewOperationPool[libcommon.Bytes96, *solid.Attestation](int(beaconCfg.MaxAttestations), "attestationsPool"),
AttesterSlashingsPool: NewOperationPool[libcommon.Bytes96, *cltypes.AttesterSlashing](int(beaconCfg.MaxAttestations), "attesterSlashingsPool"),
ProposerSlashingsPool: NewOperationPool[libcommon.Bytes96, *cltypes.ProposerSlashing](int(beaconCfg.MaxAttestations), "proposerSlashingsPool"),
BLSToExecutionChangesPool: NewOperationPool[libcommon.Bytes96, *cltypes.SignedBLSToExecutionChange](int(beaconCfg.MaxBlsToExecutionChanges), "blsExecutionChangesPool"),
SignedContributionAndProofPool: NewOperationPool[libcommon.Bytes96, *cltypes.SignedContributionAndProof](int(beaconCfg.MaxAttestations), "signedContributionAndProof"),
VoluntaryExitPool: NewOperationPool[uint64, *cltypes.SignedVoluntaryExit](int(beaconCfg.MaxBlsToExecutionChanges), "voluntaryExitsPool"),
ContributionCache: NewOperationPool[cltypes.ContributionKey, [][]byte](int(beaconCfg.MaxAttestations), "contributionCache"),
AttestationsPool: NewOperationPool[libcommon.Bytes96, *solid.Attestation](operationsPerPool, "attestationsPool"),
AttesterSlashingsPool: NewOperationPool[libcommon.Bytes96, *cltypes.AttesterSlashing](operationsPerPool, "attesterSlashingsPool"),
ProposerSlashingsPool: NewOperationPool[libcommon.Bytes96, *cltypes.ProposerSlashing](operationsPerPool, "proposerSlashingsPool"),
BLSToExecutionChangesPool: NewOperationPool[libcommon.Bytes96, *cltypes.SignedBLSToExecutionChange](operationsPerPool, "blsExecutionChangesPool"),
VoluntaryExitsPool: NewOperationPool[uint64, *cltypes.SignedVoluntaryExit](operationsPerPool, "voluntaryExitsPool"),
}
}

func (o *OperationsPool) NotifyBlock(blk *cltypes.BeaconBlock) {
blk.Body.VoluntaryExits.Range(func(_ int, exit *cltypes.SignedVoluntaryExit, _ int) bool {
o.VoluntaryExitPool.DeleteIfExist(exit.VoluntaryExit.ValidatorIndex)
o.VoluntaryExitsPool.DeleteIfExist(exit.VoluntaryExit.ValidatorIndex)
return true
})
blk.Body.AttesterSlashings.Range(func(_ int, att *cltypes.AttesterSlashing, _ int) bool {
Expand Down
2 changes: 1 addition & 1 deletion cl/spectest/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ clean:
rm -rf tests

mainnet:
CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet/deneb -failfast -v --timeout 30m
CGO_CFLAGS=-D__BLST_PORTABLE__ go test -tags=spectest -run=/mainnet -failfast -v --timeout 30m
3 changes: 2 additions & 1 deletion cl/spectest/consensus_tests/fork_choice.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ledgerwatch/erigon/cl/abstract"
"github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration"
"github.com/ledgerwatch/erigon/cl/beacon/beaconevents"
"github.com/ledgerwatch/erigon/cl/beacon/synced_data"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/clparams/initial_state"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
Expand Down Expand Up @@ -187,7 +188,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err
ethClock := eth_clock.NewEthereumClock(genesisState.GenesisTime(), genesisState.GenesisValidatorsRoot(), beaconConfig)
blobStorage := blob_storage.NewBlobStore(memdb.New("/tmp"), afero.NewMemMapFs(), math.MaxUint64, &clparams.MainnetBeaconConfig, ethClock)

forkStore, err := forkchoice.NewForkChoiceStore(ethClock, anchorState, nil, pool.NewOperationsPool(&clparams.MainnetBeaconConfig), fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}), emitters, nil, blobStorage)
forkStore, err := forkchoice.NewForkChoiceStore(ethClock, anchorState, nil, pool.NewOperationsPool(&clparams.MainnetBeaconConfig), fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}), emitters, synced_data.NewSyncedDataManager(true, &clparams.MainnetBeaconConfig), blobStorage)
require.NoError(t, err)
forkStore.SetSynced(true)

Expand Down
Loading