From 6bd0d570d97db624f20ba8722f272e1273a4f559 Mon Sep 17 00:00:00 2001 From: shana Date: Tue, 20 Dec 2022 09:25:19 -0800 Subject: [PATCH] Fix getting validators map for local relay (#20) * Fix getting validators map for local relay * pr comments * add timer for updating known validators * improvement to local validator map fetching * lock for map updating * properly lock updates * get current slot if the mapping is empty * remove onForkchoiceUpdate * graceful shutdown * Split initial proposer sync from the proposer fetch loop (#28) Co-authored-by: Mateusz Morusiewicz <11313015+Ruteri@users.noreply.github.com> --- builder/beacon_client.go | 127 ++++++++++++++++++++++--------- builder/beacon_client_test.go | 92 ---------------------- builder/builder.go | 7 +- builder/config.go | 4 + builder/local_relay.go | 9 +++ builder/relay.go | 6 ++ builder/relay_aggregator.go | 16 ++++ builder/relay_aggregator_test.go | 6 ++ builder/service.go | 2 +- cmd/geth/main.go | 2 + cmd/utils/flags.go | 14 ++++ 11 files changed, 155 insertions(+), 130 deletions(-) diff --git a/builder/beacon_client.go b/builder/beacon_client.go index 769a5674c4f3..bb97ebfa6c2c 100644 --- a/builder/beacon_client.go +++ b/builder/beacon_client.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -19,82 +20,138 @@ type testBeaconClient struct { slot uint64 } +func (b *testBeaconClient) Stop() { + return +} + func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool { return true } func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil } -func (b *testBeaconClient) onForkchoiceUpdate() (uint64, error) { - return b.slot, nil +func (b *testBeaconClient) Start() error { + return nil } type BeaconClient struct { - endpoint string + endpoint string + slotsInEpoch uint64 + secondsInSlot uint64 + + mu sync.Mutex + slotProposerMap map[uint64]PubkeyHex - mu sync.Mutex - currentEpoch uint64 - currentSlot uint64 - nextSlotProposer PubkeyHex - slotProposerMap map[uint64]PubkeyHex + closeCh chan struct{} } -func NewBeaconClient(endpoint string) *BeaconClient { +func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient { return &BeaconClient{ endpoint: endpoint, + slotsInEpoch: slotsInEpoch, + secondsInSlot: secondsInSlot, slotProposerMap: make(map[uint64]PubkeyHex), + closeCh: make(chan struct{}), } } +func (b *BeaconClient) Stop() { + close(b.closeCh) +} + func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool { return true } func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) { - /* Only returns proposer if requestedSlot is currentSlot + 1, would be a race otherwise */ b.mu.Lock() defer b.mu.Unlock() - if b.currentSlot+1 != requestedSlot { - return PubkeyHex(""), errors.New("slot out of sync") + nextSlotProposer, found := b.slotProposerMap[requestedSlot] + if !found { + log.Error("inconsistent proposer mapping", "requestSlot", requestedSlot, "slotProposerMap", b.slotProposerMap) + return PubkeyHex(""), errors.New("inconsistent proposer mapping") } - return b.nextSlotProposer, nil + return nextSlotProposer, nil } -/* Returns next slot's proposer pubkey */ -// TODO: what happens if no block for previous slot - should still get next slot -func (b *BeaconClient) onForkchoiceUpdate() (uint64, error) { - b.mu.Lock() - defer b.mu.Unlock() +func (b *BeaconClient) Start() error { + go b.UpdateValidatorMapForever() + return nil +} +func (b *BeaconClient) UpdateValidatorMapForever() { + durationPerSlot := time.Duration(b.secondsInSlot) * time.Second + + prevFetchSlot := uint64(0) + + // fetch current epoch if beacon is online currentSlot, err := fetchCurrentSlot(b.endpoint) if err != nil { - return 0, err + log.Error("could not get current slot", "err", err) + } else { + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err) + } else { + b.mu.Lock() + b.slotProposerMap = slotProposerMap + b.mu.Unlock() + } } - nextSlot := currentSlot + 1 + retryDelay := time.Second - b.currentSlot = currentSlot - nextSlotEpoch := nextSlot / 32 + // Every half epoch request validators map, polling for the slot + // more frequently to avoid missing updates on errors + timer := time.NewTimer(retryDelay) + defer timer.Stop() + for true { + select { + case <-b.closeCh: + return + case <-timer.C: + } - if nextSlotEpoch != b.currentEpoch { - // TODO: this should be prepared in advance, possibly just fetch for next epoch in advance - slotProposerMap, err := fetchEpochProposersMap(b.endpoint, nextSlotEpoch) + currentSlot, err := fetchCurrentSlot(b.endpoint) if err != nil { - return 0, err + log.Error("could not get current slot", "err", err) + timer.Reset(retryDelay) + continue } - b.currentEpoch = nextSlotEpoch - b.slotProposerMap = slotProposerMap - } + // TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0) + nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2 + if currentSlot < nextFetchSlot { + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) + continue + } - nextSlotProposer, found := b.slotProposerMap[nextSlot] - if !found { - log.Error("inconsistent proposer mapping", "currentSlot", currentSlot, "slotProposerMap", b.slotProposerMap) - return 0, errors.New("inconsistent proposer mapping") + currentEpoch := currentSlot / b.slotsInEpoch + slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1) + if err != nil { + log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err) + timer.Reset(retryDelay) + continue + } + + prevFetchSlot = currentSlot + b.mu.Lock() + // remove previous epoch slots + for k := range b.slotProposerMap { + if k < currentEpoch*b.slotsInEpoch { + delete(b.slotProposerMap, k) + } + } + // update the slot proposer map for next epoch + for k, v := range slotProposerMap { + b.slotProposerMap[k] = v + } + b.mu.Unlock() + + timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot) } - b.nextSlotProposer = nextSlotProposer - return nextSlot, nil } func fetchCurrentSlot(endpoint string) (uint64, error) { diff --git a/builder/beacon_client_test.go b/builder/beacon_client_test.go index 564275e5ad89..6073488c308f 100644 --- a/builder/beacon_client_test.go +++ b/builder/beacon_client_test.go @@ -174,95 +174,3 @@ func TestFetchEpochProposersMap(t *testing.T) { require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a"), proposersMap[1]) require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), proposersMap[2]) } - -func TestOnForkchoiceUpdate(t *testing.T) { - mbn := newMockBeaconNode() - defer mbn.srv.Close() - - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`) - - mbn.proposerDuties[1] = []byte(`{ - "dependent_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", - "execution_optimistic": false, - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a", - "validator_index": "1", - "slot": "31" - }, - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b", - "validator_index": "2", - "slot": "32" - }, - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74c", - "validator_index": "3", - "slot": "33" - } - ] -}`) - - bc := NewBeaconClient(mbn.srv.URL) - slot, err := bc.onForkchoiceUpdate() - require.NoError(t, err) - require.Equal(t, slot, uint64(32)) - - pubkeyHex, err := bc.getProposerForNextSlot(32) - require.NoError(t, err) - require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), pubkeyHex) - - _, err = bc.getProposerForNextSlot(31) - require.EqualError(t, err, "slot out of sync") - - _, err = bc.getProposerForNextSlot(33) - require.EqualError(t, err, "slot out of sync") - - mbn.headersCode = 404 - mbn.headersResp = []byte(`{ "code": 404, "message": "State not found" }`) - - slot, err = NewBeaconClient(mbn.srv.URL).onForkchoiceUpdate() - require.EqualError(t, err, "State not found") - require.Equal(t, slot, uint64(0)) - - // Check that client does not fetch new proposers if epoch did not change - mbn.headersCode = 200 - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`) - mbn.proposerDuties[1] = []byte(`{ - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d", - "validator_index": "4", - "slot": "32" - } - ] -}`) - - slot, err = bc.onForkchoiceUpdate() - require.NoError(t, err, "") - require.Equal(t, slot, uint64(32)) - - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "63", "proposer_index": "1" } } } ] }`) - mbn.proposerDuties[2] = []byte(`{ - "data": [ - { - "pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d", - "validator_index": "4", - "slot": "64" - } - ] -}`) - - slot, err = bc.onForkchoiceUpdate() - require.NoError(t, err, "") - require.Equal(t, slot, uint64(64)) - - pubkeyHex, err = bc.getProposerForNextSlot(64) - require.NoError(t, err) - require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d"), pubkeyHex) - - // Check proposers map error is routed out - mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "65", "proposer_index": "1" } } } ] }`) - _, err = bc.onForkchoiceUpdate() - require.EqualError(t, err, "inconsistent proposer mapping") -} diff --git a/builder/builder.go b/builder/builder.go index 7a6b1e5aa61d..c732bf055365 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -32,12 +32,15 @@ type ValidatorData struct { type IBeaconClient interface { isValidator(pubkey PubkeyHex) bool getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) - onForkchoiceUpdate() (uint64, error) + Start() error + Stop() } type IRelay interface { SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error GetValidatorForSlot(nextSlot uint64) (ValidatorData, error) + Start() error + Stop() } type IBuilder interface { @@ -89,7 +92,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe } func (b *Builder) Start() error { - return nil + return b.relay.Start() } func (b *Builder) Stop() error { diff --git a/builder/config.go b/builder/config.go index e22d0c38177e..cc445d5efcc8 100644 --- a/builder/config.go +++ b/builder/config.go @@ -4,6 +4,8 @@ type Config struct { Enabled bool `toml:",omitempty"` EnableValidatorChecks bool `toml:",omitempty"` EnableLocalRelay bool `toml:",omitempty"` + SlotsInEpoch uint64 `toml:",omitempty"` + SecondsInSlot uint64 `toml:",omitempty"` DisableBundleFetcher bool `toml:",omitempty"` DryRun bool `toml:",omitempty"` BuilderSecretKey string `toml:",omitempty"` @@ -23,6 +25,8 @@ var DefaultConfig = Config{ Enabled: false, EnableValidatorChecks: false, EnableLocalRelay: false, + SlotsInEpoch: 32, + SecondsInSlot: 12, DisableBundleFetcher: false, DryRun: false, BuilderSecretKey: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11", diff --git a/builder/local_relay.go b/builder/local_relay.go index 513e3415596b..d07d463231ec 100644 --- a/builder/local_relay.go +++ b/builder/local_relay.go @@ -84,6 +84,15 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning } } +func (r *LocalRelay) Start() error { + r.beaconClient.Start() + return nil +} + +func (r *LocalRelay) Stop() { + r.beaconClient.Stop() +} + func (r *LocalRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error { log.Info("submitting block to local relay", "block", msg.ExecutionPayload.BlockHash.String()) return r.submitBlock(msg) diff --git a/builder/relay.go b/builder/relay.go index 5b11c8d9f002..3c7e7d6138e5 100644 --- a/builder/relay.go +++ b/builder/relay.go @@ -125,6 +125,12 @@ func (r *RemoteRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error return ValidatorData{}, ErrValidatorNotFound } +func (r *RemoteRelay) Start() error { + return nil +} + +func (r *RemoteRelay) Stop() {} + func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error { log.Info("submitting block to remote relay", "endpoint", r.endpoint) code, err := server.SendHTTPRequest(context.TODO(), *http.DefaultClient, http.MethodPost, r.endpoint+"/relay/v1/builder/blocks", msg, nil) diff --git a/builder/relay_aggregator.go b/builder/relay_aggregator.go index d5969982f831..bb141822edcf 100644 --- a/builder/relay_aggregator.go +++ b/builder/relay_aggregator.go @@ -24,6 +24,22 @@ func NewRemoteRelayAggregator(primary IRelay, secondary []IRelay) *RemoteRelayAg } } +func (r *RemoteRelayAggregator) Start() error { + for _, relay := range r.relays { + err := relay.Start() + if err != nil { + return err + } + } + return nil +} + +func (r *RemoteRelayAggregator) Stop() { + for _, relay := range r.relays { + relay.Stop() + } +} + func (r *RemoteRelayAggregator) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, registration ValidatorData) error { r.registrationsCacheLock.RLock() defer r.registrationsCacheLock.RUnlock() diff --git a/builder/relay_aggregator_test.go b/builder/relay_aggregator_test.go index 84daed0b8ec8..d42aff65f085 100644 --- a/builder/relay_aggregator_test.go +++ b/builder/relay_aggregator_test.go @@ -58,6 +58,12 @@ func (r *testRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error) return r.gvsVd, r.gvsErr } +func (r *testRelay) Start() error { + return nil +} + +func (r *testRelay) Stop() {} + func TestRemoteRelayAggregator(t *testing.T) { t.Run("should return error if no relays return validator data", func(t *testing.T) { backend := newTestRelayAggBackend(3) diff --git a/builder/service.go b/builder/service.go index aea8349c1357..e343bcfae1d9 100644 --- a/builder/service.go +++ b/builder/service.go @@ -144,7 +144,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { copy(bellatrixForkVersion[:], bellatrixForkVersionBytes[:4]) proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot) - beaconClient := NewBeaconClient(cfg.BeaconEndpoint) + beaconClient := NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot) var localRelay *LocalRelay if cfg.EnableLocalRelay { diff --git a/cmd/geth/main.go b/cmd/geth/main.go index cc38f824a72f..e1879342a4d8 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -159,6 +159,8 @@ var ( utils.BuilderEnableValidatorChecks, utils.BuilderBlockValidationBlacklistSourceFilePath, utils.BuilderEnableLocalRelay, + utils.BuilderSecondsInSlot, + utils.BuilderSlotsInEpoch, utils.BuilderDisableBundleFetcher, utils.BuilderDryRun, utils.BuilderSecretKey, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 3d44525281cc..1b5a8dde3f9a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -706,6 +706,18 @@ var ( Usage: "Enable the local relay", Category: flags.BuilderCategory, } + BuilderSlotsInEpoch = &cli.Uint64Flag{ + Name: "builder.slots_in_epoch", + Usage: "Set the number of slots in an epoch in the local relay", + Value: 32, + Category: flags.BuilderCategory, + } + BuilderSecondsInSlot = &cli.Uint64Flag{ + Name: "builder.seconds_in_slot", + Usage: "Set the number of seconds in a slot in the local relay", + Value: 12, + Category: flags.BuilderCategory, + } BuilderDisableBundleFetcher = &cli.BoolFlag{ Name: "builder.no_bundle_fetcher", Usage: "Disable the bundle fetcher", @@ -1574,6 +1586,8 @@ func SetBuilderConfig(ctx *cli.Context, cfg *builder.Config) { cfg.Enabled = ctx.IsSet(BuilderEnabled.Name) cfg.EnableValidatorChecks = ctx.IsSet(BuilderEnableValidatorChecks.Name) cfg.EnableLocalRelay = ctx.IsSet(BuilderEnableLocalRelay.Name) + cfg.SlotsInEpoch = ctx.Uint64(BuilderSlotsInEpoch.Name) + cfg.SecondsInSlot = ctx.Uint64(BuilderSecondsInSlot.Name) cfg.DisableBundleFetcher = ctx.IsSet(BuilderDisableBundleFetcher.Name) cfg.DryRun = ctx.IsSet(BuilderDryRun.Name) cfg.BuilderSecretKey = ctx.String(BuilderSecretKey.Name)