Skip to content

Commit

Permalink
initial healthcheck analyzer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
aomerk committed Aug 16, 2023
1 parent 022f2eb commit 2d2679e
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 125 deletions.
17 changes: 10 additions & 7 deletions clients/alert_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"context"

"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
Expand All @@ -13,13 +14,15 @@ import (

// AgentRoundTrip contains
type AgentRoundTrip struct {
AgentConfig config.AgentConfig
EvalBlockRequest *protocol.EvaluateBlockRequest
EvalBlockResponse *protocol.EvaluateBlockResponse
EvalTxRequest *protocol.EvaluateTxRequest
EvalTxResponse *protocol.EvaluateTxResponse
EvalAlertRequest *protocol.EvaluateAlertRequest
EvalAlertResponse *protocol.EvaluateAlertResponse
AgentConfig config.AgentConfig
EvalBlockRequest *protocol.EvaluateBlockRequest
EvalBlockResponse *protocol.EvaluateBlockResponse
EvalTxRequest *protocol.EvaluateTxRequest
EvalTxResponse *protocol.EvaluateTxResponse
EvalAlertRequest *protocol.EvaluateAlertRequest
EvalAlertResponse *protocol.EvaluateAlertResponse
EvalHealthCheckRequest *protocol.HealthCheckRequest
EvalHealthCheckResponse *protocol.HealthCheckResponse
}

type AlertSender interface {
Expand Down
36 changes: 29 additions & 7 deletions cmd/scanner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,28 @@ func initTxAnalyzer(
as clients.AlertSender, stream *scanner.TxStreamService,
botProcessingComponents components.BotProcessing, msgClient clients.MessageClient,
) (*scanner.TxAnalyzerService, error) {
return scanner.NewTxAnalyzerService(ctx, scanner.TxAnalyzerServiceConfig{
TxChannel: stream.ReadOnlyTxStream(),
AlertSender: as,
MsgClient: msgClient,
BotProcessing: botProcessingComponents,
})
return scanner.NewTxAnalyzerService(
ctx, scanner.TxAnalyzerServiceConfig{
TxChannel: stream.ReadOnlyTxStream(),
AlertSender: as,
MsgClient: msgClient,
BotProcessing: botProcessingComponents,
},
)
}

func initHealthCheckAnalyzer(
ctx context.Context, cfg config.Config,
as clients.AlertSender,
botProcessingComponents components.BotProcessing, msgClient clients.MessageClient,
) (*scanner.TxAnalyzerService, error) {
return scanner.NewTxAnalyzerService(
ctx, scanner.TxAnalyzerServiceConfig{
AlertSender: as,
MsgClient: msgClient,
BotProcessing: botProcessingComponents,
},
)
}

func initBlockAnalyzer(
Expand Down Expand Up @@ -328,6 +344,12 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
if err != nil {
return nil, fmt.Errorf("failed to initialize tx analyzer: %v", err)
}

healthCheckAnalyzer, err := initHealthCheckAnalyzer(ctx, cfg, alertSender, botProcessingComponents, msgClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize health check analyzer: %v", err)
}

blockAnalyzer, err := initBlockAnalyzer(ctx, cfg, alertSender, txStream, botProcessingComponents, msgClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize block analyzer: %v", err)
Expand All @@ -352,7 +374,7 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
health.NewService(ctx, "", healthutils.DefaultHealthServerErrHandler, health.CheckerFrom(
summarizeReports,
ethClient, traceClient, combinationFeed, blockFeed, txStream,
txAnalyzer, blockAnalyzer, combinationAnalyzer,
txAnalyzer, blockAnalyzer, combinationAnalyzer, healthCheckAnalyzer,
botProcessingComponents.RequestSender,
publisherSvc,
)),
Expand Down
53 changes: 39 additions & 14 deletions services/components/botio/bot_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type BotClient interface {
TxRequestCh() chan<- *botreq.TxRequest
BlockRequestCh() chan<- *botreq.BlockRequest
CombinationRequestCh() chan<- *botreq.CombinationRequest
HealthCheckRequestCh() chan<- *botreq.HealthCheckRequest

LogStatus()

Expand Down Expand Up @@ -73,6 +74,7 @@ type botClient struct {
txRequests chan *botreq.TxRequest // never closed - deallocated when bot is discarded
blockRequests chan *botreq.BlockRequest // never closed - deallocated when bot is discarded
combinationRequests chan *botreq.CombinationRequest // never closed - deallocated when bot is discarded
healthCheckRequests chan *botreq.HealthCheckRequest // never closed - deallocated when bot is discarded

resultChannels botreq.SendOnlyChannels

Expand Down Expand Up @@ -135,6 +137,7 @@ func NewBotClient(
txRequests: make(chan *botreq.TxRequest, DefaultBufferSize),
blockRequests: make(chan *botreq.BlockRequest, DefaultBufferSize),
combinationRequests: make(chan *botreq.CombinationRequest, DefaultBufferSize),
healthCheckRequests: make(chan *botreq.HealthCheckRequest, DefaultBufferSize),
resultChannels: resultChannels,
errCounter: nodeutils.NewErrorCounter(3, isCriticalErr),
msgClient: msgClient,
Expand Down Expand Up @@ -224,6 +227,11 @@ func (bot *botClient) TxRequestCh() chan<- *botreq.TxRequest {
return bot.txRequests
}

// HealthCheckRequestCh returns the transaction request channel safely.
func (bot *botClient) HealthCheckRequestCh() chan<- *botreq.HealthCheckRequest {
return bot.healthCheckRequests
}

// BlockRequestCh returns the block request channel safely.
func (bot *botClient) BlockRequestCh() chan<- *botreq.BlockRequest {
return bot.blockRequests
Expand Down Expand Up @@ -455,17 +463,24 @@ func (bot *botClient) processHealthChecks() {

ticker := time.NewTicker(DefaultHealthCheckInterval)

bot.doHealthCheck(bot.ctx, lg)
for {
select {
case <-bot.ctx.Done():
return
case <-ticker.C:
exit := bot.doHealthCheck(bot.ctx, lg)
if exit {
go func() {
for {
select {
case <-bot.ctx.Done():
return
case <-ticker.C:
bot.healthCheckRequests <- &botreq.HealthCheckRequest{}
}
}
}()

for {
x := <-bot.healthCheckRequests
_ = x
exit := bot.doHealthCheck(bot.ctx, lg)
if exit {
return
}
}
}

Expand Down Expand Up @@ -709,13 +724,23 @@ func (bot *botClient) doHealthCheck(ctx context.Context, lg *log.Entry) bool {

lg.WithField("duration", time.Since(startTime)).Debugf("sending request")

bot.lifecycleMetrics.HealthCheckAttempt(botConfig)
req := &protocol.HealthCheckRequest{}
resp := new(protocol.HealthCheckResponse)

err := botClient.DoHealthCheck(ctx)
if err != nil {
bot.lifecycleMetrics.HealthCheckError(err, botConfig)
} else {
bot.lifecycleMetrics.HealthCheckSuccess(botConfig)
requestTime := time.Now().UTC()
invokeErr := botClient.Invoke(ctx, agentgrpc.MethodHealthCheck, req, resp)
responseTime := time.Now().UTC()

ts := domain.TrackingTimestampsFromMessage(nil)
ts.BotRequest = requestTime
ts.BotResponse = responseTime

bot.resultChannels.HealthCheck <- &botreq.HealthCheckResult{
AgentConfig: botConfig,
Request: nil,
Response: resp,
InvokeError: invokeErr,
Timestamps: ts,
}

return false
Expand Down
38 changes: 22 additions & 16 deletions services/components/botio/bot_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func (s *BotClientSuite) TestStartProcessStop() {
s.lifecycleMetrics.EXPECT().StatusInitialized(s.botClient.configUnsafe)
s.lifecycleMetrics.EXPECT().ActionSubscribe(combinerSubscriptions)

// test health checks
s.botGrpc.EXPECT().DoHealthCheck(gomock.Any())
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(s.botClient.configUnsafe)
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(s.botClient.configUnsafe)

s.msgClient.EXPECT().Publish(messaging.SubjectAgentsAlertSubscribe, combinerSubscriptions)
s.botClient.StartProcessing()
s.botClient.Initialize()
Expand Down Expand Up @@ -131,6 +126,22 @@ func (s *BotClientSuite) TestStartProcessStop() {
}
combinerResp := &protocol.EvaluateAlertResponse{Metadata: map[string]string{"imageHash": ""}}

healthCheckReq := &protocol.HealthCheckRequest{}
// healthCheckResp := &protocol.HealthCheckResponse{}
// test health checks
s.botGrpc.EXPECT().Invoke(
gomock.Any(), agentgrpc.MethodHealthCheck,
gomock.AssignableToTypeOf(&protocol.HealthCheckRequest{}), gomock.AssignableToTypeOf(&protocol.HealthCheckResponse{}),
).Return(nil)
s.botClient.HealthCheckRequestCh() <- &botreq.HealthCheckRequest{
Original: healthCheckReq,
}

healthCheckResult := <-s.resultChannels.HealthCheck
_ = healthCheckResult
// txResp.Timestamp = healthCheckResult.Timestamps.ToMessage() // bypass - hard to match
// txResp.LatencyMs = healthCheckResult.Response.LatencyMs // bypass - hard to match

// test tx handling
s.botGrpc.EXPECT().Invoke(
gomock.Any(), agentgrpc.MethodEvaluateTx,
Expand Down Expand Up @@ -276,15 +287,15 @@ func (s *BotClientSuite) TestHealthCheck() {
ctx := context.Background()

// Mock HealthCheckAttempt() call
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(gomock.Any())

s.botGrpc.EXPECT().DoHealthCheck(ctx).Return(nil)

// Mock HealthCheckSuccess() call
s.lifecycleMetrics.EXPECT().HealthCheckSuccess(gomock.Any())
// test health checks
s.botGrpc.EXPECT().Invoke(
gomock.Any(), agentgrpc.MethodHealthCheck,
gomock.AssignableToTypeOf(&protocol.HealthCheckRequest{}), gomock.AssignableToTypeOf(&protocol.HealthCheckResponse{}),
).Return(nil)

// Execute the method
result := s.botClient.doHealthCheck(ctx, s.lg)
<-s.resultChannels.HealthCheck

s.r.False(result, "Expected healthCheck to return false")
}
Expand All @@ -304,16 +315,11 @@ func (s *BotClientSuite) TestHealthCheck_WithError() {

ctx := context.Background()

// Mock HealthCheckAttempt() call
s.lifecycleMetrics.EXPECT().HealthCheckAttempt(gomock.Any())

err := fmt.Errorf("health check error")
// Use Do() to modify the request parameter
s.botGrpc.EXPECT().DoHealthCheck(ctx).Return(err)

// Mock HealthCheckError() call
s.lifecycleMetrics.EXPECT().HealthCheckError(gomock.Any(), gomock.Any())

// Execute the method
result := s.botClient.doHealthCheck(ctx, s.lg)

Expand Down
5 changes: 5 additions & 0 deletions services/components/botio/botreq/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ type BlockRequest struct {
type CombinationRequest struct {
Original *protocol.EvaluateAlertRequest
}

// HealthCheckRequest contains the request data.
type HealthCheckRequest struct {
Original *protocol.HealthCheckRequest
}
15 changes: 15 additions & 0 deletions services/components/botio/botreq/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ type TxResult struct {
Timestamps *domain.TrackingTimestamps
}

// HealthCheckResult contains request and response data.
type HealthCheckResult struct {
AgentConfig config.AgentConfig
Request *protocol.HealthCheckRequest
Response *protocol.HealthCheckResponse
Timestamps *domain.TrackingTimestamps
InvokeError error
}

// BlockResult contains request and response data.
type BlockResult struct {
AgentConfig config.AgentConfig
Expand All @@ -35,6 +44,7 @@ type SendReceiveChannels struct {
Tx chan *TxResult
Block chan *BlockResult
CombinationAlert chan *CombinationAlertResult
HealthCheck chan *HealthCheckResult
}

// MakeResultChannels makes the result channels and returns.
Expand All @@ -43,6 +53,7 @@ func MakeResultChannels() SendReceiveChannels {
Tx: make(chan *TxResult),
Block: make(chan *BlockResult),
CombinationAlert: make(chan *CombinationAlertResult),
HealthCheck: make(chan *HealthCheckResult),
}
}

Expand All @@ -52,6 +63,7 @@ func (src SendReceiveChannels) ReceiveOnly() ReceiveOnlyChannels {
Tx: src.Tx,
Block: src.Block,
CombinationAlert: src.CombinationAlert,
HealthCheck: src.HealthCheck,
}
}

Expand All @@ -61,6 +73,7 @@ func (src SendReceiveChannels) SendOnly() SendOnlyChannels {
Tx: src.Tx,
Block: src.Block,
CombinationAlert: src.CombinationAlert,
HealthCheck: src.HealthCheck,
}
}

Expand All @@ -69,11 +82,13 @@ type ReceiveOnlyChannels struct {
Tx <-chan *TxResult
Block <-chan *BlockResult
CombinationAlert <-chan *CombinationAlertResult
HealthCheck <-chan *HealthCheckResult
}

// SendOnlyChannels has the bot result channels.
type SendOnlyChannels struct {
Tx chan<- *TxResult
Block chan<- *BlockResult
CombinationAlert chan<- *CombinationAlertResult
HealthCheck chan<- *HealthCheckResult
}
14 changes: 14 additions & 0 deletions services/components/botio/mocks/mock_bot_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 0 additions & 20 deletions services/components/metrics/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ const (
MetricFailureInitializeResponse = "agent.failure.initialize.response"
MetricFailureInitializeValidate = "agent.failure.initialize.validate"
MetricFailureTooManyErrs = "agent.failure.too-many-errs"

MetricHealthCheckAttempt = "agent.health.attempt"
MetricHealthCheckSuccess = "agent.health.success"
MetricHealthCheckError = "agent.health.error"
)

// Lifecycle creates lifecycle metrics. It is useful in
Expand Down Expand Up @@ -73,10 +69,6 @@ type Lifecycle interface {
SystemError(metricName string, err error)

SystemStatus(metricName string, details string)

HealthCheckAttempt(botConfigs ...config.AgentConfig)
HealthCheckSuccess(botConfigs ...config.AgentConfig)
HealthCheckError(err error, botConfigs ...config.AgentConfig)
}

type lifecycle struct {
Expand Down Expand Up @@ -182,18 +174,6 @@ func (lc *lifecycle) SystemStatus(metricName string, details string) {
SendAgentMetrics(lc.msgClient, systemMetrics(fmt.Sprintf("system.status.%s", metricName), details))
}

func (lc *lifecycle) HealthCheckAttempt(botConfigs ...config.AgentConfig) {
SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckAttempt, "", botConfigs))
}

func (lc *lifecycle) HealthCheckSuccess(botConfigs ...config.AgentConfig) {
SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckSuccess, "", botConfigs))
}

func (lc *lifecycle) HealthCheckError(err error, botConfigs ...config.AgentConfig) {
SendAgentMetrics(lc.msgClient, fromBotConfigs(MetricHealthCheckError, err.Error(), botConfigs))
}

func fromBotSubscriptions(action string, subscriptions []domain.CombinerBotSubscription) (metrics []*protocol.AgentMetric) {
for _, botSub := range subscriptions {
metrics = append(metrics, CreateAgentMetric(config.AgentConfig{ID: botSub.Subscriber.BotID}, action, 1))
Expand Down
Loading

0 comments on commit 2d2679e

Please sign in to comment.