Skip to content

Commit

Permalink
Handle failover between http and multi.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcdee committed Mar 4, 2024
1 parent 644acdf commit c4ef742
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 133 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
dev:
- update http and multi clients to be aware of delayed-start
- use standard errors for common function issues

0.19.10:
- add proposer_slashing and attester_slashing events
- add bls_to_execution_change event

Expand Down
4 changes: 0 additions & 4 deletions http/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (

// Domain provides a domain for a given domain type at a given epoch.
func (s *Service) Domain(ctx context.Context, domainType phase0.DomainType, epoch phase0.Epoch) (phase0.Domain, error) {
if err := s.assertIsActive(ctx); err != nil {
return phase0.Domain{}, err
}

// Obtain the fork for the epoch.
fork, err := s.forkAtEpoch(ctx, epoch)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,19 @@ func (s *Service) get(ctx context.Context, endpoint string, query string, opts *

resp, err := s.client.Do(req)
if err != nil {
// Special case; if we have called the syncing endpoint and it failed then we
// don't ping the server to see if it has failed, as the ping calls syncing itself
// and so we find ourselves in an endless loop.
if !strings.HasSuffix(url.String(), "/node/syncing") {
switch {
case errors.Is(err, context.Canceled):
// We don't consider context canceled to be a potential connection issue, as the user canceled the context.
case errors.Is(err, context.DeadlineExceeded):
// We don't consider context deadline exceeded to be a potential connection issue, as the user selected the deadline.
case strings.HasSuffix(url.String(), "/node/syncing"):
// Special case; if we have called the syncing endpoint and it failed then we don't check the connectino status, as
// that calls the syncing endpoint itself and so we find ourselves in an endless loop.
default:
// We consider other errors to be potential connection issues.
go s.CheckConnectionState(ctx)
}

span.RecordError(errors.New("request failed"))
s.monitorGetComplete(ctx, url.Path, "failed")

Expand Down
58 changes: 22 additions & 36 deletions http/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var (
requestsMetric *prometheus.CounterVec
activeMetric *prometheus.GaugeVec
syncedMetric *prometheus.GaugeVec
stateMetric *prometheus.GaugeVec
)

func registerMetrics(ctx context.Context, monitor metrics.Service) error {
Expand Down Expand Up @@ -55,24 +54,14 @@ func registerPrometheusMetrics(_ context.Context) error {
return errors.Join(errors.New("failed to register requests_total"), err)
}

activeMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
stateMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "consensusclient",
Subsystem: "http",
Name: "active",
Help: "1 if the server is active, 0 if not",
}, []string{"server"})
if err := prometheus.Register(activeMetric); err != nil {
return errors.Join(errors.New("failed to register active"), err)
}

syncedMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "consensusclient",
Subsystem: "http",
Name: "synced",
Help: "1 if the server is synced, 0 if not",
}, []string{"server"})
if err := prometheus.Register(syncedMetric); err != nil {
return errors.Join(errors.New("failed to register synced"), err)
Name: "connection_state",
Help: "The state of the client connection (active/synced/inactive)",
}, []string{"server", "state"})
if err := prometheus.Register(stateMetric); err != nil {
return errors.Join(errors.New("failed to register state"), err)
}

return nil
Expand Down Expand Up @@ -140,26 +129,23 @@ func reduceEndpoint(in string) string {
return string(out)
}

func (s *Service) monitorActive(active bool) {
if activeMetric == nil {
return
}

if active {
activeMetric.WithLabelValues(s.address).Set(1)
} else {
activeMetric.WithLabelValues(s.address).Set(0)
}
}

func (s *Service) monitorSynced(synced bool) {
if syncedMetric == nil {
func (s *Service) monitorState(state string) {
if stateMetric == nil {
return
}

if synced {
syncedMetric.WithLabelValues(s.address).Set(1)
} else {
syncedMetric.WithLabelValues(s.address).Set(0)
switch state {
case "synced":
stateMetric.WithLabelValues(s.address, "synced").Set(1)
stateMetric.WithLabelValues(s.address, "active").Set(0)
stateMetric.WithLabelValues(s.address, "inactive").Set(0)
case "active":
stateMetric.WithLabelValues(s.address, "synced").Set(0)
stateMetric.WithLabelValues(s.address, "active").Set(1)
stateMetric.WithLabelValues(s.address, "inactive").Set(0)
case "inactive":
stateMetric.WithLabelValues(s.address, "synced").Set(0)
stateMetric.WithLabelValues(s.address, "active").Set(0)
stateMetric.WithLabelValues(s.address, "inactive").Set(1)
}
}
3 changes: 2 additions & 1 deletion http/nodesyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

// NodeSyncing provides the syncing information for the node.
func (s *Service) NodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) {
// We do not run checkIsActive here as it calls this function, so that would cause a loop.
// We do not run checkIsActive here as it calls this function, as checkIsActive can call this function
// and so it would cause a loop.
if opts == nil {
return nil, client.ErrNoOptions
}
Expand Down
10 changes: 8 additions & 2 deletions http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,14 @@ func (s *Service) CheckConnectionState(ctx context.Context) {
s.connectionSynced = synced
s.connectionMu.Unlock()

s.monitorActive(active)
s.monitorSynced(synced)
switch {
case synced:
s.monitorState("synced")
case active:
s.monitorState("active")
default:
s.monitorState("inactive")
}

// Call hooks if present.
if (!wasActive && active) && s.hooks.OnActive != nil {
Expand Down
10 changes: 10 additions & 0 deletions mock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ func (s *Service) Address() string {
return s.name
}

// IsActive returns true if the client is active.
func (s *Service) IsActive() bool {
return true
}

// IsSynced returns true if the client is synced.
func (s *Service) IsSynced() bool {
return s.SyncDistance == 0
}

// close closes the service, freeing up resources.
func (s *Service) close() {
}
83 changes: 38 additions & 45 deletions multi/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021, 2022 Attestant Limited.
// Copyright © 2021 - 2024 Attestant Limited.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -20,11 +20,12 @@ import (

consensusclient "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/http"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)

// monitor monitors active and inactive connections, and moves them between
// monitor monitors active and inactive clients, and moves them between
// lists accordingly.
func (s *Service) monitor(ctx context.Context) {
log := s.log.With().Logger()
Expand All @@ -38,13 +39,13 @@ func (s *Service) monitor(ctx context.Context) {

return
case <-time.After(30 * time.Second):
s.recheck(ctx)
s.recheck(ctx, false)
}
}
}

// recheck checks clients to update their state.
func (s *Service) recheck(ctx context.Context) {
func (s *Service) recheck(ctx context.Context, forceUpdate bool) {
// Fetch all clients.
clients := make([]consensusclient.Service, 0, len(s.activeClients)+len(s.inactiveClients))
s.clientsMu.RLock()
Expand All @@ -54,15 +55,21 @@ func (s *Service) recheck(ctx context.Context) {

// Ping each client to update its state.
for _, client := range clients {
if ping(ctx, client) {
if forceUpdate {
if httpClient, isHTTPClient := client.(*http.Service); isHTTPClient {
httpClient.CheckConnectionState(ctx)
}
}
switch {
case client.IsActive():
s.activateClient(ctx, client)
} else {
default:
s.deactivateClient(ctx, client)
}
}
}

// deactivateClient deactivates a client, moving it to the inactive list if not currently on it.
// deactivateClient marks a client as deactivated, moving it to the inactive list if not currently on it.
func (s *Service) deactivateClient(ctx context.Context, client consensusclient.Service) {
log := zerolog.Ctx(ctx)

Expand All @@ -74,19 +81,21 @@ func (s *Service) deactivateClient(ctx context.Context, client consensusclient.S
for _, activeClient := range s.activeClients {
if activeClient == client {
inactiveClients = append(inactiveClients, activeClient)
setProviderActiveMetric(ctx, client.Address(), "inactive")
setProviderStateMetric(ctx, client.Address(), "inactive")
} else {
activeClients = append(activeClients, activeClient)
}
}
if len(inactiveClients) != len(s.inactiveClients) {
log.Trace().Str("client", client.Address()).Int("active", len(activeClients)).Int("inactive", len(inactiveClients)).Msg("Client deactivated")
log.Trace().Str("client", client.Address()).
Int("active", len(activeClients)).
Int("inactive", len(inactiveClients)).
Msg("Client deactivated")
}

s.activeClients = activeClients
setProvidersMetric(ctx, "active", len(s.activeClients))
s.inactiveClients = inactiveClients
setProvidersMetric(ctx, "inactive", len(s.inactiveClients))
setConnectionsMetric(ctx, len(s.activeClients), len(s.inactiveClients))
}

// activateClient activates a client, moving it to the active list if not currently on it.
Expand All @@ -101,41 +110,21 @@ func (s *Service) activateClient(ctx context.Context, client consensusclient.Ser
for _, inactiveClient := range s.inactiveClients {
if inactiveClient == client {
activeClients = append(activeClients, inactiveClient)
setProviderActiveMetric(ctx, client.Address(), "active")
setProviderStateMetric(ctx, client.Address(), "active")
} else {
inactiveClients = append(inactiveClients, inactiveClient)
}
}
if len(inactiveClients) != len(s.inactiveClients) {
log.Trace().Str("client", client.Address()).Int("active", len(activeClients)).Int("inactive", len(inactiveClients)).Msg("Client activated")
log.Trace().Str("client", client.Address()).
Int("active", len(activeClients)).
Int("inactive", len(inactiveClients)).
Msg("Client activated")
}

s.activeClients = activeClients
setProvidersMetric(ctx, "active", len(s.activeClients))
s.inactiveClients = inactiveClients
setProvidersMetric(ctx, "inactive", len(s.inactiveClients))
}

// ping pings a client, returning true if it is ready to serve requests and
// false otherwise.
func ping(ctx context.Context, client consensusclient.Service) bool {
log := zerolog.Ctx(ctx)

provider, isProvider := client.(consensusclient.NodeSyncingProvider)
if !isProvider {
log.Debug().Str("provider", client.Address()).Msg("Client does not provide sync state")

return false
}

response, err := provider.NodeSyncing(ctx, &api.NodeSyncingOpts{})
if err != nil {
log.Warn().Err(err).Msg("Failed to obtain sync state from node")

return false
}

return (!response.Data.IsSyncing) || (response.Data.HeadSlot == 0 && response.Data.SyncDistance == 0)
setConnectionsMetric(ctx, len(s.activeClients), len(s.inactiveClients))
}

// callFunc is the definition for a call function. It provides a generic return interface
Expand All @@ -159,42 +148,44 @@ func (s *Service) doCall(ctx context.Context, call callFunc, errHandler errHandl

if len(activeClients) == 0 {
// There are no active clients; attempt to re-enable the inactive clients.
s.recheck(ctx)
s.recheck(ctx, true)
s.clientsMu.RLock()
activeClients = s.activeClients
s.clientsMu.RUnlock()
}

if len(activeClients) == 0 {
return nil, errors.New("no active clients to which to make call")
return nil, errors.New("no clients to which to make call")
}

var err error
var res interface{}
for _, client := range activeClients {
res, err = call(ctx, client)
if err != nil {
log.Trace().Err(err).Msg("Potentially deactivating client due to error")
log.Trace().Str("client", client.Name()).Str("address", client.Address()).Err(err).Msg("Potentially deactivating client due to error")
var apiErr *api.Error
if errors.As(err, &apiErr) && apiErr.StatusCode/100 == 4 {
switch {
case errors.As(err, &apiErr) && apiErr.StatusCode/100 == 4:
log.Trace().Str("client", client.Name()).Str("address", client.Address()).Err(err).Msg("Not deactivating client on user error")

return res, err
}
if errors.Is(err, context.Canceled) {
case errors.Is(err, context.Canceled):
log.Trace().Str("client", client.Name()).Str("address", client.Address()).Msg("Not deactivating client on canceled context")

return res, err
case errors.Is(err, context.DeadlineExceeded):
log.Trace().Str("client", client.Name()).Str("address", client.Address()).Msg("Not deactivating client on context deadline exceeded")

return res, err
}

failover := true
if errHandler != nil {
failover, err = errHandler(ctx, client, err)
}

if failover {
log.Debug().Str("client", client.Name()).Str("address", client.Address()).Err(err).Msg("Deactivating client on error")
// Failed with this client; try the next.
s.deactivateClient(ctx, client)

continue
Expand Down Expand Up @@ -227,6 +218,8 @@ func (s *Service) providerInfo(ctx context.Context, provider consensusclient.Ser
switch {
case strings.Contains(strings.ToLower(response.Data), "lighthouse"):
providerName = "lighthouse"
case strings.Contains(strings.ToLower(response.Data), "lodestar"):
providerName = "lodestar"
case strings.Contains(strings.ToLower(response.Data), "prysm"):
providerName = "prysm"
case strings.Contains(strings.ToLower(response.Data), "teku"):
Expand Down
2 changes: 1 addition & 1 deletion multi/client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// TestDeactivateMulti ensures that multiple concurrent calls to deactivateClient
// do not result in a bad list of active and inactive clients.
// do not result in a bad list of synced and unsynced clients.
func TestDeactivateMulti(t *testing.T) {
ctx := context.Background()

Expand Down
Loading

0 comments on commit c4ef742

Please sign in to comment.