Skip to content

Commit

Permalink
more cl state events (#11527)
Browse files Browse the repository at this point in the history
part of #11149
All events are supported now
  • Loading branch information
domiwei authored Aug 11, 2024
1 parent da2f260 commit 647a72f
Show file tree
Hide file tree
Showing 18 changed files with 281 additions and 135 deletions.
43 changes: 22 additions & 21 deletions cl/beacon/beaconevents/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/turbo/engineapi/engine_types"
)

type EventStream struct {
Expand Down Expand Up @@ -37,14 +38,14 @@ type (

// State event topics
const (
StateHead EventTopic = "head"
StateBlock EventTopic = "block"
StateBlockGossip EventTopic = "block_gossip"
StateFinalizedCheckpoint EventTopic = "finalized_checkpoint"
StateChainReorg EventTopic = "chain_reorg"
StateFinalityUpdate EventTopic = "light_client_finality_update"
StateOptimisticUpdate EventTopic = "light_client_optimistic_update"
StatePayloadAttributes EventTopic = "payload_attributes"
StateHead EventTopic = "head"
StateBlock EventTopic = "block"
StateBlockGossip EventTopic = "block_gossip"
StateFinalizedCheckpoint EventTopic = "finalized_checkpoint"
StateChainReorg EventTopic = "chain_reorg"
StateLightClientFinalityUpdate EventTopic = "light_client_finality_update"
StateLightClientOptimisticUpdate EventTopic = "light_client_optimistic_update"
StatePayloadAttributes EventTopic = "payload_attributes"
)

// State event data types
Expand Down Expand Up @@ -103,17 +104,17 @@ type PayloadAttributesData struct {
}

type PayloadAttributesContent struct {
ProposerIndex uint64 `json:"proposer_index,string"`
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockNumber uint64 `json:"parent_block_number,string"`
ParentBlockRoot common.Hash `json:"parent_block_root"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes PayloadAttributes `json:"payload_attributes"`
}

type PayloadAttributes struct {
Timestamp uint64 `json:"timestamp,string"`
PrevRandao common.Hash `json:"prev_randao"`
SuggestedFeeRecipient common.Address `json:"suggested_fee_recipient"`
Withdrawals *solid.ListSSZ[*cltypes.Withdrawal] `json:"withdrawals,omitempty"`
/*
proposal_slot: the slot at which a block using these payload attributes may be built.
parent_block_root: the beacon block root of the parent block to be built upon.
parent_block_number: the execution block number of the parent block.
parent_block_hash: the execution block hash of the parent block.
proposer_index: the validator index of the proposer at proposal_slot on the chain identified by parent_block_root.
*/
ProposerIndex uint64 `json:"proposer_index,string"`
ProposalSlot uint64 `json:"proposal_slot,string"`
ParentBlockNumber uint64 `json:"parent_block_number,string"`
ParentBlockRoot common.Hash `json:"parent_block_root"`
ParentBlockHash common.Hash `json:"parent_block_hash"`
PayloadAttributes engine_types.PayloadAttributes `json:"payload_attributes"`
}
37 changes: 37 additions & 0 deletions cl/beacon/beaconevents/state_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,53 @@ func (f *stateFeed) SendHead(value *HeadData) int {
})
}

// The node has received a block (from P2P or API) that is successfully imported on the fork-choice on_block handler
func (f *stateFeed) SendBlock(value *BlockData) int {
return f.feed.Send(&EventStream{
Event: StateBlock,
Data: value,
})
}

// The node has received a block (from P2P or API) that passes validation rules of the beacon_block topic
func (f *stateFeed) SendBlockGossip(value *BlockGossipData) int {
return f.feed.Send(&EventStream{
Event: StateBlockGossip,
Data: value,
})
}

func (f *stateFeed) SendFinalizedCheckpoint(value *FinalizedCheckpointData) int {
return f.feed.Send(&EventStream{
Event: StateFinalizedCheckpoint,
Data: value,
})
}

func (f *stateFeed) SendLightClientFinalityUpdate(value *LightClientFinalityUpdateData) int {
return f.feed.Send(&EventStream{
Event: StateLightClientFinalityUpdate,
Data: value,
})
}

func (f *stateFeed) SendLightClientOptimisticUpdate(value *LightClientOptimisticUpdateData) int {
return f.feed.Send(&EventStream{
Event: StateLightClientOptimisticUpdate,
Data: value,
})
}

func (f *stateFeed) SendChainReorg(value *ChainReorgData) int {
return f.feed.Send(&EventStream{
Event: StateChainReorg,
Data: value,
})
}

func (f *stateFeed) SendPayloadAttributes(value *PayloadAttributesData) int {
return f.feed.Send(&EventStream{
Event: StatePayloadAttributes,
Data: value,
})
}
1 change: 0 additions & 1 deletion cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,6 @@ func (a *ApiHandler) storeBlockAndBlobs(
if _, err := a.engine.ForkChoiceUpdate(ctx, a.forkchoiceStore.GetEth1Hash(finalizedBlockRoot), a.forkchoiceStore.GetEth1Hash(blockRoot), nil); err != nil {
return err
}
a.validatorsMonitor.OnNewBlock(block.Block)
return nil
}

Expand Down
35 changes: 18 additions & 17 deletions cl/beacon/handler/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ var validTopics = map[event.EventTopic]struct{}{
event.OpProposerSlashing: {},
event.OpVoluntaryExit: {},
// state events
event.StateBlock: {},
event.StateBlockGossip: {},
event.StateChainReorg: {},
event.StateFinalityUpdate: {},
event.StateFinalizedCheckpoint: {},
event.StateHead: {},
event.StateOptimisticUpdate: {},
event.StatePayloadAttributes: {},
event.StateBlock: {},
event.StateBlockGossip: {},
event.StateChainReorg: {},
event.StateLightClientFinalityUpdate: {},
event.StateFinalizedCheckpoint: {},
event.StateHead: {},
event.StateLightClientOptimisticUpdate: {},
event.StatePayloadAttributes: {},
}

func (a *ApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Request) {
Expand All @@ -53,6 +53,7 @@ func (a *ApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Reque
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

topics := r.URL.Query()["topics"]
Expand All @@ -65,7 +66,7 @@ func (a *ApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Reque
}
subscribeTopics.Add(topic)
}
log.Info("Subscribed to topics", "topics", subscribeTopics)
log.Info("Subscribed to event stream topics", "topics", subscribeTopics)

eventCh := make(chan *event.EventStream, 128)
opSub := a.emitters.Operation().Subscribe(eventCh)
Expand All @@ -78,21 +79,21 @@ func (a *ApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Reque

for {
select {
case event := <-eventCh:
if !subscribeTopics.Contains(event.Event) {
case e := <-eventCh:
if !subscribeTopics.Contains(e.Event) {
continue
}
if event.Data == nil {
log.Warn("event data is nil", "event", event)
if e.Data == nil {
log.Warn("event data is nil", "event", e)
continue
}
// marshal and send
buf, err := json.Marshal(event.Data)
buf, err := json.Marshal(e.Data)
if err != nil {
log.Warn("failed to encode data", "err", err, "topic", event.Event)
log.Warn("failed to encode data", "err", err, "topic", e.Event)
continue
}
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.Event, string(buf)); err != nil {
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", e.Event, string(buf)); err != nil {
log.Warn("failed to write event", "err", err)
continue
}
Expand All @@ -112,7 +113,7 @@ func (a *ApiHandler) EventSourceGetV1Events(w http.ResponseWriter, r *http.Reque
http.Error(w, fmt.Sprintf("event error %v", err), http.StatusInternalServerError)
return
case <-r.Context().Done():
log.Info("Client disconnected")
log.Info("Client disconnected from event stream")
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions cl/monitor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package monitor

import (
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/phase1/core/state"
)

//go:generate mockgen -typed=true -destination=mock_services/validator_monitor_mock.go -package=mock_services . ValidatorMonitor
type ValidatorMonitor interface {
ObserveValidator(vid uint64)
RemoveValidator(vid uint64)
OnNewBlock(block *cltypes.BeaconBlock) error
OnNewBlock(state *state.CachingBeaconState, block *cltypes.BeaconBlock) error
}

type dummyValdatorMonitor struct{}
Expand All @@ -17,6 +18,6 @@ func (d *dummyValdatorMonitor) ObserveValidator(vid uint64) {}

func (d *dummyValdatorMonitor) RemoveValidator(vid uint64) {}

func (d *dummyValdatorMonitor) OnNewBlock(block *cltypes.BeaconBlock) error {
func (d *dummyValdatorMonitor) OnNewBlock(_ *state.CachingBeaconState, _ *cltypes.BeaconBlock) error {
return nil
}
13 changes: 7 additions & 6 deletions cl/monitor/mock_services/validator_monitor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 8 additions & 26 deletions cl/monitor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/utils/eth_clock"
)

type ValidatorMonitorImpl struct {
fc forkchoice.ForkChoiceStorageReader
type validatorMonitorImpl struct {
syncedData *synced_data.SyncedDataManager
ethClock eth_clock.EthereumClock
beaconCfg *clparams.BeaconChainConfig
Expand All @@ -25,7 +24,6 @@ type ValidatorMonitorImpl struct {

func NewValidatorMonitor(
enableMonitor bool,
fc forkchoice.ForkChoiceStorageReader,
ethClock eth_clock.EthereumClock,
beaconConfig *clparams.BeaconChainConfig,
syncedData *synced_data.SyncedDataManager,
Expand All @@ -34,8 +32,7 @@ func NewValidatorMonitor(
return &dummyValdatorMonitor{}
}

m := &ValidatorMonitorImpl{
fc: fc,
m := &validatorMonitorImpl{
ethClock: ethClock,
beaconCfg: beaconConfig,
syncedData: syncedData,
Expand All @@ -46,15 +43,15 @@ func NewValidatorMonitor(
return m
}

func (m *ValidatorMonitorImpl) ObserveValidator(vid uint64) {
func (m *validatorMonitorImpl) ObserveValidator(vid uint64) {
m.vaidatorStatuses.addValidator(vid)
}

func (m *ValidatorMonitorImpl) RemoveValidator(vid uint64) {
func (m *validatorMonitorImpl) RemoveValidator(vid uint64) {
m.vaidatorStatuses.removeValidator(vid)
}

func (m *ValidatorMonitorImpl) OnNewBlock(block *cltypes.BeaconBlock) error {
func (m *validatorMonitorImpl) OnNewBlock(state *state.CachingBeaconState, block *cltypes.BeaconBlock) error {
var (
atts = block.Body.Attestations
blockEpoch = m.ethClock.GetEpochAtSlot(block.Slot)
Expand All @@ -65,21 +62,6 @@ func (m *ValidatorMonitorImpl) OnNewBlock(block *cltypes.BeaconBlock) error {
return nil
}

blockRoot, err := block.HashSSZ()
if err != nil {
log.Warn("failed to hash block", "err", err, "slot", block.Slot)
return err
}

state, err := m.fc.GetStateAtBlockRoot(blockRoot, false)
if err != nil {
log.Warn("failed to get state at block root", "err", err, "slot", block.Slot, "blockRoot", blockRoot)
return err
} else if state == nil {
log.Info("state is nil. syncing", "slot", block.Slot, "blockRoot", blockRoot)
return nil
}

// todo: maybe launch a goroutine to update attester status
// update attester status
atts.Range(func(i int, att *solid.Attestation, length int) bool {
Expand Down Expand Up @@ -108,7 +90,7 @@ func (m *ValidatorMonitorImpl) OnNewBlock(block *cltypes.BeaconBlock) error {
return nil
}

func (m *ValidatorMonitorImpl) runReportAttesterStatus() {
func (m *validatorMonitorImpl) runReportAttesterStatus() {
// every epoch seconds
epochDuration := time.Duration(m.beaconCfg.SlotsPerEpoch) * time.Duration(m.beaconCfg.SecondsPerSlot) * time.Second
ticker := time.NewTicker(epochDuration)
Expand Down Expand Up @@ -136,7 +118,7 @@ func (m *ValidatorMonitorImpl) runReportAttesterStatus() {

}

func (m *ValidatorMonitorImpl) runReportProposerStatus() {
func (m *validatorMonitorImpl) runReportProposerStatus() {
// check proposer in previous slot every slot duration
ticker := time.NewTicker(time.Duration(m.beaconCfg.SecondsPerSlot) * time.Second)
defer ticker.Stop()
Expand Down
6 changes: 4 additions & 2 deletions cl/phase1/forkchoice/fork_choice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/erigontech/erigon/cl/beacon/beaconevents"
"github.com/erigontech/erigon/cl/beacon/synced_data"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph"
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestForkChoiceBasic(t *testing.T) {
require.NoError(t, utils.DecodeSSZSnappy(anchorState, anchorStateEncoded, int(clparams.AltairVersion)))
pool := pool.NewOperationsPool(&clparams.MainnetBeaconConfig)
emitters := beaconevents.NewEventEmitter()
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}), emitters, sd, nil)
validatorMonitor := monitor.NewValidatorMonitor(false, nil, nil, nil)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{}, emitters), emitters, sd, nil, validatorMonitor)
require.NoError(t, err)
// first steps
store.OnTick(0)
Expand Down Expand Up @@ -146,7 +148,7 @@ func TestForkChoiceChainBellatrix(t *testing.T) {
sd := synced_data.NewSyncedDataManager(true, &clparams.MainnetBeaconConfig)
store, err := forkchoice.NewForkChoiceStore(nil, anchorState, nil, pool, fork_graph.NewForkGraphDisk(anchorState, afero.NewMemMapFs(), beacon_router_configuration.RouterConfiguration{
Beacon: true,
}), emitters, sd, nil)
}, emitters), emitters, sd, nil, nil)
store.OnTick(2000)
require.NoError(t, err)
for _, block := range blocks {
Expand Down
Loading

0 comments on commit 647a72f

Please sign in to comment.