Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process atts and update head before proposing #10653

Merged
merged 14 commits into from
May 9, 2022
Merged
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type ChainInfoFetcher interface {
HeadDomainFetcher
}

// HeadUpdater defines a common interface for methods in blockchain service
// which allow to update the head info
type HeadUpdater interface {
UpdateHead(context.Context) error
}

// TimeFetcher retrieves the Ethereum consensus data that's related to time.
type TimeFetcher interface {
GenesisTime() time.Time
Expand Down
66 changes: 39 additions & 27 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,39 +128,51 @@ func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
return
}

// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
continue
}
s.processAttestations(s.ctx)

justified := s.store.JustifiedCheckpt()
if justified == nil {
log.WithError(errNilJustifiedInStore).Error("Could not get justified checkpoint")
continue
}
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(justified.Root))
if err != nil {
log.WithError(err).Errorf("Unable to get justified balances for root %v", justified.Root)
continue
}
newHeadRoot, err := s.updateHead(s.ctx, balances)
if err != nil {
log.WithError(err).Warn("Resolving fork due to new attestation")
}
if s.headRoot() != newHeadRoot {
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
if err := s.UpdateHead(s.ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
return
}
s.notifyEngineIfChangedHead(s.ctx, newHeadRoot)
}
}
}()
}

// UpdateHead processes fork choice attestations from the pool and updates the head.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should say that it updates the canonical head of the chain based on information from fork-choice attestations and votes. It requires no external inputs.

func (s *Service) UpdateHead(ctx context.Context) error {
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
return nil
}

// Only one process can process attestations and update head at a time.
s.processAttestationsLock.Lock()
defer s.processAttestationsLock.Unlock()
Comment on lines +149 to +151
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only addition from the refactor above


s.processAttestations(ctx)

justified := s.store.JustifiedCheckpt()
if justified == nil {
return errNilJustifiedInStore
}
balances, err := s.justifiedBalances.get(ctx, bytesutil.ToBytes32(justified.Root))
if err != nil {
return err
}
newHeadRoot, err := s.updateHead(ctx, balances)
if err != nil {
log.WithError(err).Warn("Resolving fork due to new attestation")
}
if s.headRoot() != newHeadRoot {
log.WithFields(logrus.Fields{
"oldHeadRoot": fmt.Sprintf("%#x", s.headRoot()),
"newHeadRoot": fmt.Sprintf("%#x", newHeadRoot),
}).Debug("Head changed due to attestations")
}
s.notifyEngineIfChangedHead(ctx, newHeadRoot)
return nil
}

// This calls notify Forkchoice Update in the event that the head has changed
func (s *Service) notifyEngineIfChangedHead(ctx context.Context, newHeadRoot [32]byte) {
if s.headRoot() == newHeadRoot {
Expand Down
34 changes: 34 additions & 0 deletions beacon-chain/blockchain/receive_attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,37 @@ func TestNotifyEngineIfChangedHead(t *testing.T) {
require.Equal(t, types.ValidatorIndex(1), vId)
require.Equal(t, [8]byte{1}, payloadID)
}

func TestService_ProcessAttestationsAndUpdateHead(t *testing.T) {
ctx := context.Background()
opts := testServiceOptsWithDB(t)
opts = append(opts, WithAttestationPool(attestations.NewPool()), WithStateNotifier(&mockBeaconNode{}))

service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.genesisTime = prysmTime.Now().Add(-1 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
genesisState, pks := util.DeterministicGenesisState(t, 64)
require.NoError(t, genesisState.SetGenesisTime(uint64(prysmTime.Now().Unix())-params.BeaconConfig().SecondsPerSlot))
require.NoError(t, service.saveGenesisData(ctx, genesisState))
atts, err := util.GenerateAttestations(genesisState, pks, 1, 0, false)
require.NoError(t, err)
tRoot := bytesutil.ToBytes32(atts[0].Data.Target.Root)
copied := genesisState.Copy()
copied, err = transition.ProcessSlots(ctx, copied, 1)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, copied, tRoot))
require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{Root: tRoot[:]}))
require.NoError(t, service.cfg.ForkChoiceStore.InsertOptimisticBlock(ctx, 0, tRoot, tRoot, params.BeaconConfig().ZeroHash, 1, 1))
require.NoError(t, service.cfg.AttPool.SaveForkchoiceAttestations(atts))
b := util.NewBeaconBlock()
wb, err := wrapper.WrappedSignedBeaconBlock(b)
require.NoError(t, err)
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wb))
service.head.root = r // Old head
require.Equal(t, 1, len(service.cfg.AttPool.ForkchoiceAttestations()))
require.NoError(t, err, service.UpdateHead(ctx))
require.Equal(t, tRoot, service.head.root) // Validate head is the new one
require.Equal(t, 0, len(service.cfg.AttPool.ForkchoiceAttestations())) // Validate att pool is empty
}
31 changes: 16 additions & 15 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,22 @@ const headSyncMinEpochsAfterCheckpoint = 128
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
nextEpochBoundarySlot types.Slot
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.SignedBeaconBlock
initSyncBlocksLock sync.RWMutex
justifiedBalances *stateBalanceCache
wsVerifier *WeakSubjectivityVerifier
store *store.Store
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
nextEpochBoundarySlot types.Slot
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.SignedBeaconBlock
initSyncBlocksLock sync.RWMutex
justifiedBalances *stateBalanceCache
wsVerifier *WeakSubjectivityVerifier
store *store.Store
processAttestationsLock sync.Mutex
}

// config options for the service.
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ func (s *ChainService) IsOptimisticForRoot(_ context.Context, _ [32]byte) (bool,
return s.Optimistic, nil
}

// ProcessAttestationsAndUpdateHead mocks the same method in the chain service.
func (s *ChainService) UpdateHead(_ context.Context) error { return nil }

// ReceiveAttesterSlashing mocks the same method in the chain service.
func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {
}
func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {}
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (b *BeaconNode) registerRPCService() error {
PeerManager: p2pService,
MetadataProvider: p2pService,
ChainInfoFetcher: chainService,
HeadUpdater: chainService,
HeadFetcher: chainService,
CanonicalFetcher: chainService,
ForkFetcher: chainService,
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/eth/beacon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ type Server struct {
V1Alpha1ValidatorServer *v1alpha1validator.Server
SyncChecker sync.Checker
CanonicalHistory *stategen.CanonicalHistory
HeadUpdater blockchain.HeadUpdater
}
1 change: 1 addition & 0 deletions beacon-chain/rpc/eth/validator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// providing RPC endpoints intended for validator clients.
type Server struct {
HeadFetcher blockchain.HeadFetcher
HeadUpdater blockchain.HeadUpdater
TimeFetcher blockchain.TimeFetcher
SyncChecker sync.Checker
AttestationsPool attestations.Pool
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/rpc/eth/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ func TestProduceBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -774,6 +775,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -879,6 +881,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -1028,6 +1031,7 @@ func TestProduceBlockV2(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -1174,6 +1178,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -1280,6 +1285,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -1429,6 +1435,7 @@ func TestProduceBlindedBlock(t *testing.T) {
HeadFetcher: &mockChain.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mockChain.ChainService{},
HeadUpdater: &mockChain.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/prysm/v1alpha1/beacon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ type Server struct {
StateGen stategen.StateManager
SyncChecker sync.Checker
ReplayerBuilder stategen.ReplayerBuilder
HeadUpdater blockchain.HeadUpdater
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (vs *Server) buildPhase0BlockData(ctx context.Context, req *ethpb.BlockRequ
return nil, fmt.Errorf("syncing to latest head, not ready to respond")
}

if err := vs.HeadUpdater.UpdateHead(ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opt it to log an error instead of return error. Error from this call should not prevent block proposal

}

// Retrieve the parent block as the current head of the canonical chain.
parentRoot, err := vs.HeadFetcher.HeadRoot(ctx)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestProposer_GetBlock_OK(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -158,6 +159,7 @@ func TestProposer_GetBlock_AddsUnaggregatedAtts(t *testing.T) {
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
HeadUpdater: &mock.ChainService{},
MockEth1Votes: true,
SlashingsPool: slashings.NewPool(),
AttPool: attestations.NewPool(),
Expand Down Expand Up @@ -2105,6 +2107,7 @@ func TestProposer_GetBeaconBlock_PreForkEpoch(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -2217,6 +2220,7 @@ func TestProposer_GetBeaconBlock_PostForkEpoch(t *testing.T) {
HeadFetcher: &mock.ChainService{State: beaconState, Root: parentRoot[:]},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: &mockPOW.POWChain{},
Expand Down Expand Up @@ -2370,6 +2374,7 @@ func TestProposer_GetBeaconBlock_BellatrixEpoch(t *testing.T) {
TimeFetcher: &mock.ChainService{Genesis: time.Now()},
SyncChecker: &mockSync.Sync{IsSyncing: false},
BlockReceiver: &mock.ChainService{},
HeadUpdater: &mock.ChainService{},
ChainStartFetcher: &mockPOW.POWChain{},
Eth1InfoFetcher: &mockPOW.POWChain{},
Eth1BlockFetcher: c,
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/prysm/v1alpha1/validator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Server struct {
AttestationCache *cache.AttestationCache
ProposerSlotIndexCache *cache.ProposerPayloadIDsCache
HeadFetcher blockchain.HeadFetcher
HeadUpdater blockchain.HeadUpdater
ForkFetcher blockchain.ForkFetcher
FinalizationFetcher blockchain.FinalizationFetcher
TimeFetcher blockchain.TimeFetcher
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
BeaconMonitoringPort int
BeaconDB db.HeadAccessDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
HeadUpdater blockchain.HeadUpdater
HeadFetcher blockchain.HeadFetcher
CanonicalFetcher blockchain.CanonicalFetcher
ForkFetcher blockchain.ForkFetcher
Expand Down Expand Up @@ -189,6 +190,7 @@ func (s *Service) Start() {
AttPool: s.cfg.AttestationsPool,
ExitPool: s.cfg.ExitPool,
HeadFetcher: s.cfg.HeadFetcher,
HeadUpdater: s.cfg.HeadUpdater,
ForkFetcher: s.cfg.ForkFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
TimeFetcher: s.cfg.GenesisTimeFetcher,
Expand All @@ -215,6 +217,7 @@ func (s *Service) Start() {
}
validatorServerV1 := &validator.Server{
HeadFetcher: s.cfg.HeadFetcher,
HeadUpdater: s.cfg.HeadUpdater,
TimeFetcher: s.cfg.GenesisTimeFetcher,
SyncChecker: s.cfg.SyncService,
AttestationsPool: s.cfg.AttestationsPool,
Expand Down Expand Up @@ -261,6 +264,7 @@ func (s *Service) Start() {
BeaconDB: s.cfg.BeaconDB,
AttestationsPool: s.cfg.AttestationsPool,
SlashingsPool: s.cfg.SlashingsPool,
HeadUpdater: s.cfg.HeadUpdater,
HeadFetcher: s.cfg.HeadFetcher,
FinalizationFetcher: s.cfg.FinalizationFetcher,
CanonicalFetcher: s.cfg.CanonicalFetcher,
Expand Down