Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

status identifies failing component, fleet gateway may report degraded, liveness endpoint added #569

Merged
merged 7 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,4 @@
- Bump node.js version for heartbeat/synthetics to 16.15.0
- Support scheduled actions and cancellation of pending actions. {issue}393[393] {pull}419[419]
- Add `@metadata.input_id` and `@metadata.stream_id` when applying the inject stream processor {pull}527[527]
- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}569[569]
46 changes: 29 additions & 17 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// Package fleet handles interactions between the elastic-agent and fleet-server.
// Specifically it will handle agent checkins, and action queueing/dispatch.
package fleet

import (
Expand Down Expand Up @@ -75,23 +77,24 @@ type actionQueue interface {
}

type fleetGateway struct {
bgContext context.Context
log *logger.Logger
dispatcher pipeline.Dispatcher
client client.Sender
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
unauthCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore stateStore
queue actionQueue
bgContext context.Context
log *logger.Logger
dispatcher pipeline.Dispatcher
client client.Sender
scheduler scheduler.Scheduler
backoff backoff.Backoff
settings *fleetGatewaySettings
agentInfo agentInfo
reporter fleetReporter
done chan struct{}
wg sync.WaitGroup
acker store.FleetAcker
unauthCounter int
checkinFailCounter int
statusController status.Controller
statusReporter status.Reporter
stateStore stateStore
queue actionQueue
}

// New creates a new fleet gateway
Expand Down Expand Up @@ -286,6 +289,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.log.Debugf("Checking started")
resp, err := f.execute(f.bgContext)
if err != nil {
f.checkinFailCounter++
f.log.Errorf("Could not communicate with fleet-server Checking API will retry, error: %s", err)
if !f.backoff.Wait() {
// Something bad has happened and we log it and we should update our current state.
Expand All @@ -299,8 +303,16 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
f.statusReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
// Update status reporter for gateway to degraded when there are two consecutive failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put that magic number into a constant?

IRC, this mean that if Agent is not able to reach Fleet-Server for 1 min (assuming 30sec backoff?), it will me marked has failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, the wait function in the loop is an exponential backoff

// Note that this may not propagate to fleet-server as the agent is having issues checking in.
// It may also (falsely) report a degraded session for 30s if it is eventually successful.
// However this component will allow the agent to report fleet gateway degredation locally.
f.statusReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
}
continue
}
f.checkinFailCounter = 0
// Request was successful, return the collected actions.
return resp, nil
}
Expand Down
130 changes: 84 additions & 46 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
repo "github.com/elastic/elastic-agent/internal/pkg/reporter"
fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet"
fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand Down Expand Up @@ -705,59 +707,95 @@ func TestRetriesOnFailures(t *testing.T) {
Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second},
}

t.Run("When the gateway fails to communicate with the checkin API we will retry",
withGateway(agentInfo, settings, func(
t *testing.T,
gateway gateway.FleetGateway,
client *testingClient,
dispatcher *testingDispatcher,
scheduler *scheduler.Stepper,
rep repo.Backend,
) {
fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
}
clientWaitFn := client.Answer(fail)
err := gateway.Start()
require.NoError(t, err)
t.Run("When the gateway fails to communicate with the checkin API we will retry", func(t *testing.T) {
scheduler := scheduler.NewStepper()
client := newTestingClient()
dispatcher := newTestingDispatcher()
log, _ := logger.New("fleet_gateway", false)
rep := getReporter(agentInfo, log, t)

_ = rep.Report(context.Background(), &testStateEvent{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initial tick is done out of bound so we can block on channels.
scheduler.Next()
diskStore := storage.NewDiskStore(paths.AgentStateStoreFile())
stateStore, err := store.NewStateStore(log, diskStore)
require.NoError(t, err)

// Simulate a 500 errors for the next 3 calls.
<-clientWaitFn
<-clientWaitFn
<-clientWaitFn

// API recover
waitFn := ackSeq(
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
err = json.Unmarshal(content, &cr)
if err != nil {
t.Fatal(err)
}
queue := &mockQueue{}
queue.On("DequeueActions").Return([]fleetapi.Action{})
queue.On("Actions").Return([]fleetapi.Action{})

fleetReporter := &testutils.MockReporter{}
fleetReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
fleetReporter.On("Unregister").Maybe()

statusController := &testutils.MockController{}
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
statusController.On("StatusString").Return("string")

gateway, err := newFleetGatewayWithScheduler(
ctx,
log,
settings,
agentInfo,
client,
dispatcher,
scheduler,
rep,
noopacker.NewAcker(),
statusController,
stateStore,
queue,
)
require.NoError(t, err)

fail := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil
}
clientWaitFn := client.Answer(fail)
err = gateway.Start()
require.NoError(t, err)

require.Equal(t, 1, len(cr.Events))
_ = rep.Report(context.Background(), &testStateEvent{})

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
// Initial tick is done out of bound so we can block on channels.
scheduler.Next()

dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
)
// Simulate a 500 errors for the next 3 calls.
<-clientWaitFn
<-clientWaitFn
<-clientWaitFn

waitFn()
}))
// API recover
waitFn := ackSeq(
client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) {
cr := &request{}
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
err = json.Unmarshal(content, &cr)
if err != nil {
t.Fatal(err)
}

require.Equal(t, 1, len(cr.Events))

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),

dispatcher.Answer(func(actions ...fleetapi.Action) error {
require.Equal(t, 0, len(actions))
return nil
}),
)

waitFn()
statusController.AssertExpectations(t)
fleetReporter.AssertExpectations(t)
})

t.Run("The retry loop is interruptible",
withGateway(agentInfo, &fleetGatewaySettings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
package fleet

import (
"net/http"

"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/core/status"
)

type noopController struct{}

func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
func (*noopController) ServeHTTP(_ http.ResponseWriter, _ *http.Request) {}

michel-laterman marked this conversation as resolved.
Show resolved Hide resolved
type noopReporter struct{}

Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func run(override cfgOverrider) error {
rex := reexec.NewManager(rexLogger, execPath)

statusCtrl := status.NewController(logger)
statusCtrl.SetAgentID(agentInfo.AgentID())

tracer, err := initTracer(agentName, release.Version(), cfg.Settings.MonitoringConfig)
if err != nil {
Expand Down Expand Up @@ -182,7 +183,7 @@ func run(override cfgOverrider) error {
control.SetRouteFn(app.Routes)
control.SetMonitoringCfg(cfg.Settings.MonitoringConfig)

serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer)
serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer, statusCtrl)
if err != nil {
return err
}
Expand Down Expand Up @@ -337,6 +338,7 @@ func setupMetrics(
cfg *monitoringCfg.MonitoringConfig,
app application.Application,
tracer *apm.Tracer,
statusCtrl status.Controller,
) (func() error, error) {
if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil {
return nil, err
Expand All @@ -349,7 +351,7 @@ func setupMetrics(
}

bufferEnabled := cfg.HTTP.Buffer != nil && cfg.HTTP.Buffer.Enabled
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer)
s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer, statusCtrl)
if err != nil {
return nil, errors.New(err, "could not start the HTTP server for the API")
}
Expand Down
6 changes: 5 additions & 1 deletion internal/pkg/core/monitoring/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func New(
enableProcessStats bool,
enableBuffer bool,
tracer *apm.Tracer,
statusController http.Handler,
) (*api.Server, error) {
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
// log but ignore
Expand All @@ -44,7 +45,7 @@ func New(
return nil, err
}

return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer)
return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer, statusController)
}

func exposeMetricsEndpoint(
Expand All @@ -55,6 +56,7 @@ func exposeMetricsEndpoint(
enableProcessStats bool,
enableBuffer bool,
tracer *apm.Tracer,
statusController http.Handler,
) (*api.Server, error) {
r := mux.NewRouter()
if tracer != nil {
Expand All @@ -63,6 +65,8 @@ func exposeMetricsEndpoint(
statsHandler := statsHandler(ns("stats"))
r.Handle("/stats", createHandler(statsHandler))

r.Handle("/liveness", statusController)

if enableProcessStats {
r.HandleFunc("/processes", processesHandler(routesFetchFn))
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
Expand Down
39 changes: 39 additions & 0 deletions internal/pkg/core/status/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package status

import (
"encoding/json"
"net/http"
"time"
)

// LivenessResponse is the response body for the liveness endpoint.
type LivenessResponse struct {
ID string `json:"id"`
Status string `json:"status"`
Message string `json:"message"`
UpdateTime time.Time `json:"update_timestamp"`
}

// ServeHTTP is an HTTP Handler for the status controller.
// Respose code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := r.Status()
lr := LivenessResponse{
ID: r.agentID,
Status: s.Status.String(),
Message: s.Message,
UpdateTime: s.UpdateTime,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give the overall agent status, and to have per input we would need v2 is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is for the agent liveness, endpoints for inputs/units can be added later when v2 status reporting has been implemented.
Also, the status reporter itself may need to change for V2.

status := http.StatusOK
if s.Status != Healthy {
status = http.StatusServiceUnavailable
}

wr.Header().Set("Content-Type", "application/json")
wr.WriteHeader(status)
enc := json.NewEncoder(wr)
if err := enc.Encode(lr); err != nil {
r.log.Errorf("Unable to encode liveness response: %v", err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michel-laterman When we add the new information for the liveness check we refactor the handler do a dependency injection style and have a StatusReporter that take a an interface to query for status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is probably going to be the case for the v2 implementation.

Loading