diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 24b4f09c44a..cbe894d63c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -185,3 +185,4 @@ - Bump node.js version for heartbeat/synthetics to 16.15.0 - Support scheduled actions and cancellation of pending actions. {issue}393[393] {pull}419[419] - Add `@metadata.input_id` and `@metadata.stream_id` when applying the inject stream processor {pull}527[527] +- Add liveness endpoint, allow fleet-gateway component to report degraded state, add update time and messages to status output. {issue}390[390] {pull}569[569] diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 4ff4c34ad42..f5c02d3356a 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -2,6 +2,8 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +// Package fleet handles interactions between the elastic-agent and fleet-server. +// Specifically it will handle agent checkins, and action queueing/dispatch. package fleet import ( @@ -75,23 +77,24 @@ type actionQueue interface { } type fleetGateway struct { - bgContext context.Context - log *logger.Logger - dispatcher pipeline.Dispatcher - client client.Sender - scheduler scheduler.Scheduler - backoff backoff.Backoff - settings *fleetGatewaySettings - agentInfo agentInfo - reporter fleetReporter - done chan struct{} - wg sync.WaitGroup - acker store.FleetAcker - unauthCounter int - statusController status.Controller - statusReporter status.Reporter - stateStore stateStore - queue actionQueue + bgContext context.Context + log *logger.Logger + dispatcher pipeline.Dispatcher + client client.Sender + scheduler scheduler.Scheduler + backoff backoff.Backoff + settings *fleetGatewaySettings + agentInfo agentInfo + reporter fleetReporter + done chan struct{} + wg sync.WaitGroup + acker store.FleetAcker + unauthCounter int + checkinFailCounter int + statusController status.Controller + statusReporter status.Reporter + stateStore stateStore + queue actionQueue } // New creates a new fleet gateway @@ -286,6 +289,7 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { f.log.Debugf("Checking started") resp, err := f.execute(f.bgContext) if err != nil { + f.checkinFailCounter++ f.log.Errorf("Could not communicate with fleet-server Checking API will retry, error: %s", err) if !f.backoff.Wait() { // Something bad has happened and we log it and we should update our current state. @@ -299,8 +303,16 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) { f.statusReporter.Update(state.Failed, err.Error(), nil) return nil, err } + if f.checkinFailCounter > 1 { + // Update status reporter for gateway to degraded when there are two consecutive failures. + // Note that this may not propagate to fleet-server as the agent is having issues checking in. + // It may also (falsely) report a degraded session for 30s if it is eventually successful. + // However this component will allow the agent to report fleet gateway degredation locally. + f.statusReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil) + } continue } + f.checkinFailCounter = 0 // Request was successful, return the collected actions. return resp, nil } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index a9b9380519f..6ce62448276 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -26,12 +26,14 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" + "github.com/elastic/elastic-agent/internal/pkg/core/state" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop" repo "github.com/elastic/elastic-agent/internal/pkg/reporter" fleetreporter "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet" fleetreporterConfig "github.com/elastic/elastic-agent/internal/pkg/reporter/fleet/config" "github.com/elastic/elastic-agent/internal/pkg/scheduler" + "github.com/elastic/elastic-agent/internal/pkg/testutils" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -705,59 +707,95 @@ func TestRetriesOnFailures(t *testing.T) { Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second}, } - t.Run("When the gateway fails to communicate with the checkin API we will retry", - withGateway(agentInfo, settings, func( - t *testing.T, - gateway gateway.FleetGateway, - client *testingClient, - dispatcher *testingDispatcher, - scheduler *scheduler.Stepper, - rep repo.Backend, - ) { - fail := func(_ http.Header, _ io.Reader) (*http.Response, error) { - return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil - } - clientWaitFn := client.Answer(fail) - err := gateway.Start() - require.NoError(t, err) + t.Run("When the gateway fails to communicate with the checkin API we will retry", func(t *testing.T) { + scheduler := scheduler.NewStepper() + client := newTestingClient() + dispatcher := newTestingDispatcher() + log, _ := logger.New("fleet_gateway", false) + rep := getReporter(agentInfo, log, t) - _ = rep.Report(context.Background(), &testStateEvent{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Initial tick is done out of bound so we can block on channels. - scheduler.Next() + diskStore := storage.NewDiskStore(paths.AgentStateStoreFile()) + stateStore, err := store.NewStateStore(log, diskStore) + require.NoError(t, err) - // Simulate a 500 errors for the next 3 calls. - <-clientWaitFn - <-clientWaitFn - <-clientWaitFn - - // API recover - waitFn := ackSeq( - client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) { - cr := &request{} - content, err := ioutil.ReadAll(body) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(content, &cr) - if err != nil { - t.Fatal(err) - } + queue := &mockQueue{} + queue.On("DequeueActions").Return([]fleetapi.Action{}) + queue.On("Actions").Return([]fleetapi.Action{}) + + fleetReporter := &testutils.MockReporter{} + fleetReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2) + fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe() + fleetReporter.On("Unregister").Maybe() + + statusController := &testutils.MockController{} + statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once() + statusController.On("StatusString").Return("string") + + gateway, err := newFleetGatewayWithScheduler( + ctx, + log, + settings, + agentInfo, + client, + dispatcher, + scheduler, + rep, + noopacker.NewAcker(), + statusController, + stateStore, + queue, + ) + require.NoError(t, err) + + fail := func(_ http.Header, _ io.Reader) (*http.Response, error) { + return wrapStrToResp(http.StatusInternalServerError, "something is bad"), nil + } + clientWaitFn := client.Answer(fail) + err = gateway.Start() + require.NoError(t, err) - require.Equal(t, 1, len(cr.Events)) + _ = rep.Report(context.Background(), &testStateEvent{}) - resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) - return resp, nil - }), + // Initial tick is done out of bound so we can block on channels. + scheduler.Next() - dispatcher.Answer(func(actions ...fleetapi.Action) error { - require.Equal(t, 0, len(actions)) - return nil - }), - ) + // Simulate a 500 errors for the next 3 calls. + <-clientWaitFn + <-clientWaitFn + <-clientWaitFn - waitFn() - })) + // API recover + waitFn := ackSeq( + client.Answer(func(_ http.Header, body io.Reader) (*http.Response, error) { + cr := &request{} + content, err := ioutil.ReadAll(body) + if err != nil { + t.Fatal(err) + } + err = json.Unmarshal(content, &cr) + if err != nil { + t.Fatal(err) + } + + require.Equal(t, 1, len(cr.Events)) + + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + }), + + dispatcher.Answer(func(actions ...fleetapi.Action) error { + require.Equal(t, 0, len(actions)) + return nil + }), + ) + + waitFn() + statusController.AssertExpectations(t) + fleetReporter.AssertExpectations(t) + }) t.Run("The retry loop is interruptible", withGateway(agentInfo, &fleetGatewaySettings{ diff --git a/internal/pkg/agent/application/gateway/fleet/noop_status_controller.go b/internal/pkg/agent/application/gateway/fleet/noop_status_controller_test.go similarity index 77% rename from internal/pkg/agent/application/gateway/fleet/noop_status_controller.go rename to internal/pkg/agent/application/gateway/fleet/noop_status_controller_test.go index d5097655a63..bbae6958ab6 100644 --- a/internal/pkg/agent/application/gateway/fleet/noop_status_controller.go +++ b/internal/pkg/agent/application/gateway/fleet/noop_status_controller_test.go @@ -5,21 +5,25 @@ package fleet import ( + "net/http" + "github.com/elastic/elastic-agent/internal/pkg/core/state" "github.com/elastic/elastic-agent/internal/pkg/core/status" ) type noopController struct{} +func (*noopController) SetAgentID(_ string) {} func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} } func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter { return &noopReporter{} } -func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} } -func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} } -func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy } -func (*noopController) UpdateStateID(_ string) {} -func (*noopController) StatusString() string { return "online" } +func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} } +func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} } +func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy } +func (*noopController) UpdateStateID(_ string) {} +func (*noopController) StatusString() string { return "online" } +func (*noopController) ServeHTTP(_ http.ResponseWriter, _ *http.Request) {} type noopReporter struct{} diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index b584baf2f09..2989d2d6c6e 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -152,6 +152,7 @@ func run(override cfgOverrider) error { rex := reexec.NewManager(rexLogger, execPath) statusCtrl := status.NewController(logger) + statusCtrl.SetAgentID(agentInfo.AgentID()) tracer, err := initTracer(agentName, release.Version(), cfg.Settings.MonitoringConfig) if err != nil { @@ -182,7 +183,7 @@ func run(override cfgOverrider) error { control.SetRouteFn(app.Routes) control.SetMonitoringCfg(cfg.Settings.MonitoringConfig) - serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer) + serverStopFn, err := setupMetrics(agentInfo, logger, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, app, tracer, statusCtrl) if err != nil { return err } @@ -337,6 +338,7 @@ func setupMetrics( cfg *monitoringCfg.MonitoringConfig, app application.Application, tracer *apm.Tracer, + statusCtrl status.Controller, ) (func() error, error) { if err := report.SetupMetrics(logger, agentName, version.GetDefaultVersion()); err != nil { return nil, err @@ -349,7 +351,7 @@ func setupMetrics( } bufferEnabled := cfg.HTTP.Buffer != nil && cfg.HTTP.Buffer.Enabled - s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer) + s, err := monitoringServer.New(logger, endpointConfig, monitoring.GetNamespace, app.Routes, isProcessStatsEnabled(cfg.HTTP), bufferEnabled, tracer, statusCtrl) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") } diff --git a/internal/pkg/core/monitoring/server/server.go b/internal/pkg/core/monitoring/server/server.go index e5929909158..e0289ae72d0 100644 --- a/internal/pkg/core/monitoring/server/server.go +++ b/internal/pkg/core/monitoring/server/server.go @@ -33,6 +33,7 @@ func New( enableProcessStats bool, enableBuffer bool, tracer *apm.Tracer, + statusController http.Handler, ) (*api.Server, error) { if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { // log but ignore @@ -44,7 +45,7 @@ func New( return nil, err } - return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer) + return exposeMetricsEndpoint(log, cfg, ns, routesFetchFn, enableProcessStats, enableBuffer, tracer, statusController) } func exposeMetricsEndpoint( @@ -55,6 +56,7 @@ func exposeMetricsEndpoint( enableProcessStats bool, enableBuffer bool, tracer *apm.Tracer, + statusController http.Handler, ) (*api.Server, error) { r := mux.NewRouter() if tracer != nil { @@ -63,6 +65,8 @@ func exposeMetricsEndpoint( statsHandler := statsHandler(ns("stats")) r.Handle("/stats", createHandler(statsHandler)) + r.Handle("/liveness", statusController) + if enableProcessStats { r.HandleFunc("/processes", processesHandler(routesFetchFn)) r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler))) diff --git a/internal/pkg/core/status/handler.go b/internal/pkg/core/status/handler.go new file mode 100644 index 00000000000..2e7476901c5 --- /dev/null +++ b/internal/pkg/core/status/handler.go @@ -0,0 +1,39 @@ +package status + +import ( + "encoding/json" + "net/http" + "time" +) + +// LivenessResponse is the response body for the liveness endpoint. +type LivenessResponse struct { + ID string `json:"id"` + Status string `json:"status"` + Message string `json:"message"` + UpdateTime time.Time `json:"update_timestamp"` +} + +// ServeHTTP is an HTTP Handler for the status controller. +// Respose code is 200 for a healthy agent, and 503 otherwise. +// Response body is a JSON object that contains the agent ID, status, message, and the last status update time. +func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) { + s := r.Status() + lr := LivenessResponse{ + ID: r.agentID, + Status: s.Status.String(), + Message: s.Message, + UpdateTime: s.UpdateTime, + } + status := http.StatusOK + if s.Status != Healthy { + status = http.StatusServiceUnavailable + } + + wr.Header().Set("Content-Type", "application/json") + wr.WriteHeader(status) + enc := json.NewEncoder(wr) + if err := enc.Encode(lr); err != nil { + r.log.Errorf("Unable to encode liveness response: %v", err) + } +} diff --git a/internal/pkg/core/status/reporter.go b/internal/pkg/core/status/reporter.go index 04c8251fa92..92632af2ed5 100644 --- a/internal/pkg/core/status/reporter.go +++ b/internal/pkg/core/status/reporter.go @@ -2,11 +2,15 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +// Package status handles process status reporting. package status import ( + "fmt" + "net/http" "reflect" "sync" + "time" "github.com/google/uuid" @@ -47,10 +51,12 @@ type AgentStatus struct { Status AgentStatusCode Message string Applications []AgentApplicationStatus + UpdateTime time.Time } // Controller takes track of component statuses. type Controller interface { + SetAgentID(string) RegisterComponent(string) Reporter RegisterComponentWithPersistance(string, bool) Reporter RegisterApp(id string, name string) Reporter @@ -58,15 +64,19 @@ type Controller interface { StatusCode() AgentStatusCode StatusString() string UpdateStateID(string) + ServeHTTP(http.ResponseWriter, *http.Request) } type controller struct { mx sync.Mutex status AgentStatusCode + message string + updateTime time.Time reporters map[string]*reporter appReporters map[string]*reporter log *logger.Logger stateID string + agentID string } // NewController creates a new reporter. @@ -79,6 +89,14 @@ func NewController(log *logger.Logger) Controller { } } +// SetAgentID sets the agentID of the controller +// The AgentID may be used in the handler output. +func (r *controller) SetAgentID(agentID string) { + r.mx.Lock() + defer r.mx.Unlock() + r.agentID = agentID +} + // UpdateStateID cleans health when new configuration is received. // To prevent reporting failures from previous configuration. func (r *controller) UpdateStateID(stateID string) { @@ -175,8 +193,9 @@ func (r *controller) Status() AgentStatus { } return AgentStatus{ Status: r.status, - Message: "", + Message: r.message, Applications: apps, + UpdateTime: r.updateTime, } } @@ -189,12 +208,14 @@ func (r *controller) StatusCode() AgentStatusCode { func (r *controller) updateStatus() { status := Healthy + message := "" r.mx.Lock() for id, rep := range r.reporters { s := statusToAgentStatus(rep.status) if s > status { status = s + message = fmt.Sprintf("component %s: %s", id, rep.message) } r.log.Debugf("'%s' has status '%s'", id, s) @@ -207,6 +228,7 @@ func (r *controller) updateStatus() { s := statusToAgentStatus(rep.status) if s > status { status = s + message = fmt.Sprintf("app %s: %s", id, rep.message) } r.log.Debugf("'%s' has status '%s'", id, s) @@ -217,15 +239,17 @@ func (r *controller) updateStatus() { } if r.status != status { - r.logStatus(status) + r.logStatus(status, message) r.status = status + r.message = message + r.updateTime = time.Now().UTC() } r.mx.Unlock() } -func (r *controller) logStatus(status AgentStatusCode) { +func (r *controller) logStatus(status AgentStatusCode, message string) { logFn := r.log.Infof if status == Degraded { logFn = r.log.Warnf @@ -233,7 +257,7 @@ func (r *controller) logStatus(status AgentStatusCode) { logFn = r.log.Errorf } - logFn("Elastic Agent status changed to: '%s'", status) + logFn("Elastic Agent status changed to %q: %q", status, message) } // StatusString retrieves human readable string of current agent status. diff --git a/internal/pkg/core/status/reporter_test.go b/internal/pkg/core/status/reporter_test.go index 0d44e402798..09a66661fc5 100644 --- a/internal/pkg/core/status/reporter_test.go +++ b/internal/pkg/core/status/reporter_test.go @@ -6,6 +6,7 @@ package status import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -98,4 +99,56 @@ func TestReporter(t *testing.T) { assert.Equal(t, Degraded, r.StatusCode()) assert.Equal(t, "degraded", r.StatusString()) }) + + t.Run("Check agent status components healthy", func(t *testing.T) { + r := NewController(l) + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") + + r1.Update(state.Healthy, "", nil) + r2.Update(state.Healthy, "", nil) + r3.Update(state.Healthy, "", nil) + + s := r.Status() + assert.Equal(t, Healthy, s.Status) + assert.Equal(t, "", s.Message) + assert.Equal(t, time.Time{}, s.UpdateTime) + }) + + //nolint:dupl // test case + t.Run("Check agent status one component degraded", func(t *testing.T) { + r := NewController(l) + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") + + r1.Update(state.Healthy, "", nil) + r2.Update(state.Degraded, "degraded", nil) + r3.Update(state.Healthy, "", nil) + + s := r.Status() + assert.Equal(t, Degraded, s.Status) + assert.Contains(t, s.Message, "component r2") + assert.Contains(t, s.Message, "degraded") + assert.NotEqual(t, time.Time{}, s.UpdateTime) + }) + + //nolint:dupl // test case + t.Run("Check agent status one component failed", func(t *testing.T) { + r := NewController(l) + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") + + r1.Update(state.Healthy, "", nil) + r2.Update(state.Failed, "failed", nil) + r3.Update(state.Degraded, "degraded", nil) + + s := r.Status() + assert.Equal(t, Failed, s.Status) + assert.Contains(t, s.Message, "component r2") + assert.Contains(t, s.Message, "failed") + assert.NotEqual(t, time.Time{}, s.UpdateTime) + }) } diff --git a/internal/pkg/testutils/status_reporter.go b/internal/pkg/testutils/status_reporter.go new file mode 100644 index 00000000000..a045e50304a --- /dev/null +++ b/internal/pkg/testutils/status_reporter.go @@ -0,0 +1,67 @@ +package testutils + +import ( + "net/http" + + "github.com/elastic/elastic-agent/internal/pkg/core/state" + "github.com/elastic/elastic-agent/internal/pkg/core/status" + "github.com/stretchr/testify/mock" +) + +type MockController struct { + mock.Mock +} + +func (m *MockController) SetAgentID(id string) { + m.Called(id) +} + +func (m *MockController) RegisterComponent(id string) status.Reporter { + args := m.Called(id) + return args.Get(0).(status.Reporter) +} + +func (m *MockController) RegisterComponentWithPersistance(id string, b bool) status.Reporter { + args := m.Called(id, b) + return args.Get(0).(status.Reporter) +} + +func (m *MockController) RegisterApp(id, name string) status.Reporter { + args := m.Called(id, name) + return args.Get(0).(status.Reporter) +} + +func (m *MockController) Status() status.AgentStatus { + args := m.Called() + return args.Get(0).(status.AgentStatus) +} + +func (m *MockController) StatusCode() status.AgentStatusCode { + args := m.Called() + return args.Get(0).(status.AgentStatusCode) +} + +func (m *MockController) StatusString() string { + args := m.Called() + return args.String(0) +} + +func (m *MockController) UpdateStateID(id string) { + m.Called(id) +} + +func (m *MockController) ServeHTTP(wr http.ResponseWriter, req *http.Request) { + m.Called(wr, req) +} + +type MockReporter struct { + mock.Mock +} + +func (m *MockReporter) Update(state state.Status, message string, meta map[string]interface{}) { + m.Called(state, message, meta) +} + +func (m *MockReporter) Unregister() { + m.Called() +}