diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 3d952426631..0a933429122 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -12,13 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +// This file defines the http endpoints for etcd health checks. +// The endpoints include /livez, /readyz and /health. + package etcdhttp import ( + "bytes" "context" "encoding/json" "fmt" "net/http" + "path" + "strings" "time" "go.uber.org/zap" @@ -33,14 +39,20 @@ import ( ) const ( - PathHealth = "/health" - PathProxyHealth = "/proxy/health" + PathHealth = "/health" + PathProxyHealth = "/proxy/health" + HealthStatusSuccess string = "success" + HealthStatusError string = "error" + checkTypeLivez = "livez" + checkTypeReadyz = "readyz" + checkTypeHealth = "health" ) type ServerHealth interface { serverHealthV2V3 Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) Config() config.ServerConfig + AuthStore() auth.AuthStore } type serverHealthV2V3 interface { @@ -50,33 +62,36 @@ type serverHealthV2V3 interface { // HandleHealth registers metrics and health handlers for v2. func HandleHealthForV2(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } if h := checkLeader(lg, srv, serializable); h.Health != "true" { return h } - return checkV2API(lg, srv) + return checkV2API(ctx, lg, srv) })) } // HandleHealth registers metrics and health handlers. it checks health by using v3 range request // and its corresponding timeout. func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } if h := checkLeader(lg, srv, serializable); h.Health != "true" { return h } - return checkAPI(lg, srv, serializable) + return checkAPI(ctx, lg, srv, serializable) })) + + installLivezEndpoints(lg, mux, srv) + installReadyzEndpoints(lg, mux, srv) } // NewHealthHandler handles '/health' requests. -func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc { +func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) @@ -84,13 +99,13 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serial lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed)) return } - excludedAlarms := getExcludedAlarms(r) + excludedAlarms := getQuerySet(r, "exclude") // Passing the query parameter "serializable=true" ensures that the // health of the local etcd is checked vs the health of the cluster. // This is useful for probes attempting to validate the liveness of // the etcd process vs readiness of the cluster to serve requests. serializableFlag := getSerializableFlag(r) - h := hfunc(excludedAlarms, serializableFlag) + h := hfunc(r.Context(), excludedAlarms, serializableFlag) defer func() { if h.Health == "true" { healthSuccess.Inc() @@ -123,11 +138,29 @@ var ( Name: "health_failures", Help: "The total number of failed health checks", }) + healthCheckGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "healthcheck", + Help: "The result of each kind of healthcheck.", + }, + []string{"type", "name"}, + ) + healthCheckCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "healthchecks_total", + Help: "The total number of each kind of healthcheck.", + }, + []string{"type", "name", "status"}, + ) ) func init() { prometheus.MustRegister(healthSuccess) prometheus.MustRegister(healthFailed) + prometheus.MustRegister(healthCheckGauge) + prometheus.MustRegister(healthCheckCounter) } // Health defines etcd server health status. @@ -137,20 +170,24 @@ type Health struct { Reason string `json:"reason"` } -type AlarmSet map[string]struct{} +// HealthStatus is used in new /readyz or /livez health checks instead of the Health struct. +type HealthStatus struct { + Reason string `json:"reason"` + Status string `json:"status"` +} -func getExcludedAlarms(r *http.Request) (alarms AlarmSet) { - alarms = make(map[string]struct{}, 2) - alms, found := r.URL.Query()["exclude"] +func getQuerySet(r *http.Request, query string) StringSet { + querySet := make(map[string]struct{}) + qs, found := r.URL.Query()[query] if found { - for _, alm := range alms { - if len(alm) == 0 { + for _, q := range qs { + if len(q) == 0 { continue } - alarms[alm] = struct{}{} + querySet[q] = struct{}{} } } - return alarms + return querySet } func getSerializableFlag(r *http.Request) bool { @@ -159,7 +196,7 @@ func getSerializableFlag(r *http.Request) bool { // TODO: etcdserver.ErrNoLeader in health API -func checkAlarms(lg *zap.Logger, srv serverHealthV2V3, excludedAlarms AlarmSet) Health { +func checkAlarms(lg *zap.Logger, srv serverHealthV2V3, excludedAlarms StringSet) Health { h := Health{Health: "true"} as := srv.Alarms() if len(as) > 0 { @@ -197,9 +234,9 @@ func checkLeader(lg *zap.Logger, srv serverHealthV2V3, serializable bool) Health return h } -func checkV2API(lg *zap.Logger, srv etcdserver.ServerV2) Health { +func checkV2API(ctx context.Context, lg *zap.Logger, srv etcdserver.ServerV2) Health { h := Health{Health: "true"} - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) _, err := srv.Do(ctx, pb.Request{Method: "QGET"}) cancel() if err != nil { @@ -212,13 +249,14 @@ func checkV2API(lg *zap.Logger, srv etcdserver.ServerV2) Health { return h } -func checkAPI(lg *zap.Logger, srv ServerHealth, serializable bool) Health { +func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializable bool) Health { h := Health{Health: "true"} cfg := srv.Config() - ctx, cancel := context.WithTimeout(context.Background(), cfg.ReqTimeout()) - _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) + ctx = srv.AuthStore().WithRoot(ctx) + cctx, cancel := context.WithTimeout(ctx, cfg.ReqTimeout()) + _, err := srv.Range(cctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) cancel() - if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied { + if err != nil { h.Health = "false" h.Reason = fmt.Sprintf("RANGE ERROR:%s", err) lg.Warn("serving /health false; Range fails", zap.Error(err)) @@ -227,3 +265,196 @@ func checkAPI(lg *zap.Logger, srv ServerHealth, serializable bool) Health { lg.Debug("serving /health true") return h } + +type HealthCheck func(ctx context.Context) error + +type CheckRegistry struct { + checkType string + checks map[string]HealthCheck +} + +func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)} + reg.Register("serializable_read", readCheck(server, true /* serializable */)) + reg.InstallHttpEndpoints(lg, mux) +} + +func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)} + reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) + // serializable_read checks if local read is ok. + // linearizable_read checks if there is consensus in the cluster. + // Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure. + reg.Register("serializable_read", readCheck(server, true)) + // linearizable_read check would be replaced by read_index check in 3.6 + reg.Register("linearizable_read", readCheck(server, false)) + reg.InstallHttpEndpoints(lg, mux) +} + +func (reg *CheckRegistry) Register(name string, check HealthCheck) { + reg.checks[name] = check +} + +func (reg *CheckRegistry) RootPath() string { + return "/" + reg.checkType +} + +func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) { + checkNames := make([]string, 0, len(reg.checks)) + for k := range reg.checks { + checkNames = append(checkNames, k) + } + + // installs the http handler for the root path. + reg.installRootHttpEndpoint(lg, mux, checkNames...) + for _, checkName := range checkNames { + // installs the http handler for the individual check sub path. + subpath := path.Join(reg.RootPath(), checkName) + check := checkName + mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) HealthStatus { + return reg.runHealthChecks(r.Context(), check) + })) + } +} + +func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) HealthStatus { + h := HealthStatus{Status: HealthStatusSuccess} + var individualCheckOutput bytes.Buffer + for _, checkName := range checkNames { + check, found := reg.checks[checkName] + if !found { + panic(fmt.Errorf("Health check: %s not registered", checkName)) + } + if err := check(ctx); err != nil { + fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err) + h.Status = HealthStatusError + recordMetrics(reg.checkType, checkName, HealthStatusError) + } else { + fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName) + recordMetrics(reg.checkType, checkName, HealthStatusSuccess) + } + } + h.Reason = individualCheckOutput.String() + return h +} + +// installRootHttpEndpoint installs the http handler for the root path. +func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) { + hfunc := func(r *http.Request) HealthStatus { + // extracts the health check names to be excludeList from the query param + excluded := getQuerySet(r, "exclude") + + filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded) + h := reg.runHealthChecks(r.Context(), filteredCheckNames...) + return h + } + mux.Handle(reg.RootPath(), newHealthHandler(reg.RootPath(), lg, hfunc)) +} + +// newHealthHandler generates a http HandlerFunc for a health check function hfunc. +func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) HealthStatus) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed)) + return + } + h := hfunc(r) + // Always returns detailed reason for failed checks. + if h.Status == HealthStatusError { + http.Error(w, h.Reason, http.StatusServiceUnavailable) + lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable)) + return + } + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + // Only writes detailed reason for verbose requests. + if _, found := r.URL.Query()["verbose"]; found { + fmt.Fprint(w, h.Reason) + } + fmt.Fprint(w, "ok\n") + lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK)) + } +} + +func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string { + filteredList := []string{} + for chk := range checks { + if _, found := excluded[chk]; found { + delete(excluded, chk) + continue + } + filteredList = append(filteredList, chk) + } + if len(excluded) > 0 { + // For version compatibility, excluding non-exist checks would not fail the request. + lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...))) + } + return filteredList +} + +// formatQuoted returns a formatted string of the health check names, +// preserving the order passed in. +func formatQuoted(names ...string) string { + quoted := make([]string, 0, len(names)) + for _, name := range names { + quoted = append(quoted, fmt.Sprintf("%q", name)) + } + return strings.Join(quoted, ",") +} + +type StringSet map[string]struct{} + +func (s StringSet) List() []string { + keys := make([]string, 0, len(s)) + for k := range s { + keys = append(keys, k) + } + return keys +} + +func listToStringSet(list []string) StringSet { + set := make(map[string]struct{}) + for _, s := range list { + set[s] = struct{}{} + } + return set +} + +func recordMetrics(checkType, name string, status string) { + val := 0.0 + if status == HealthStatusSuccess { + val = 1.0 + } + healthCheckGauge.With(prometheus.Labels{ + "type": checkType, + "name": name, + }).Set(val) + healthCheckCounter.With(prometheus.Labels{ + "type": checkType, + "name": name, + "status": status, + }).Inc() +} + +// activeAlarmCheck checks if a specific alarm type is active in the server. +func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error { + return func(ctx context.Context) error { + as := srv.Alarms() + for _, v := range as { + if v.Alarm == at { + return fmt.Errorf("alarm activated: %s", at.String()) + } + } + return nil + } +} + +func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error { + return func(ctx context.Context) error { + ctx = srv.AuthStore().WithRoot(ctx) + _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) + return err + } +} diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 27ad30be4e0..0019121755a 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -2,31 +2,38 @@ package etcdhttp import ( "context" - "encoding/json" "fmt" "io" "net/http" "net/http/httptest" + "strings" "testing" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap/zaptest" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/config" - "go.etcd.io/etcd/server/v3/etcdserver" - "go.uber.org/zap/zaptest" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" ) type fakeHealthServer struct { fakeServer - health string - apiError error + serializableReadError error + linearizableReadError error + missingLeader bool + authStore auth.AuthStore } -func (s *fakeHealthServer) Range(ctx context.Context, request *pb.RangeRequest) (*pb.RangeResponse, error) { - return nil, s.apiError +func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) { + if req.Serializable { + return nil, s.serializableReadError + } + return nil, s.linearizableReadError } func (s *fakeHealthServer) Config() config.ServerConfig { @@ -34,144 +41,369 @@ func (s *fakeHealthServer) Config() config.ServerConfig { } func (s *fakeHealthServer) Leader() types.ID { - if s.health == "true" { + if !s.missingLeader { return 1 } return types.ID(raft.None) } -func (s *fakeHealthServer) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) { - if s.health == "true" { - return etcdserver.Response{}, nil - } - return etcdserver.Response{}, fmt.Errorf("fail health check") -} + +func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } + func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } +type healthTestCase struct { + name string + healthCheckURL string + expectStatusCode int + inResult []string + notInResult []string + + alarms []*pb.AlarmMember + apiError error + missingLeader bool +} + func TestHealthHandler(t *testing.T) { // define the input and expected output // input: alarms, and healthCheckURL - tests := []struct { - name string - alarms []*pb.AlarmMember - healthCheckURL string - apiError error - - expectStatusCode int - expectHealth string - }{ + tests := []healthTestCase{ { name: "Healthy if no alarm", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarm is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Healthy if NOSPACE alarm is on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if NOSPACE alarm is excluded", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if multiple NOSPACE alarms are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE&exclude=CORRUPT", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { - name: "Healthy even if authentication failed", + name: "Unhealthy if api is not available", healthCheckURL: "/health", - apiError: auth.ErrUserEmpty, - expectStatusCode: http.StatusOK, - expectHealth: "true", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, }, { - name: "Healthy even if authorization failed", + name: "Unhealthy if no leader", healthCheckURL: "/health", - apiError: auth.ErrPermissionDenied, - expectStatusCode: http.StatusOK, - expectHealth: "true", + expectStatusCode: http.StatusServiceUnavailable, + missingLeader: true, }, { - name: "Unhealthy if api is not available", - healthCheckURL: "/health", - apiError: fmt.Errorf("Unexpected error"), - expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", + name: "Healthy if no leader and serializable=true", + healthCheckURL: "/health?serializable=true", + expectStatusCode: http.StatusOK, + missingLeader: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mux := http.NewServeMux() + lg := zaptest.NewLogger(t) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ - fakeServer: fakeServer{alarms: tt.alarms}, - health: tt.expectHealth, - apiError: tt.apiError, + fakeServer: fakeServer{alarms: tt.alarms}, + serializableReadError: tt.apiError, + linearizableReadError: tt.apiError, + missingLeader: tt.missingLeader, + authStore: auth.NewAuthStore(lg, be, nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil) + }) + } +} - res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)}) - if err != nil { - t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err) - } - if res == nil { - t.Errorf("got nil http response with http request %s", tt.healthCheckURL) - return +func TestHttpSubPath(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "/readyz/data_corruption ok", + healthCheckURL: "/readyz/data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "/readyz/serializable_read not ok with error", + apiError: fmt.Errorf("Unexpected error"), + healthCheckURL: "/readyz/serializable_read", + expectStatusCode: http.StatusServiceUnavailable, + notInResult: []string{"data_corruption"}, + }, + { + name: "/readyz/non_exist 404", + healthCheckURL: "/readyz/non_exist", + expectStatusCode: http.StatusNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } - if res.StatusCode != tt.expectStatusCode { - t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "", tt.expectStatusCode) + }) + } +} + +func TestDataCorruptionCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Live if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/livez", + expectStatusCode: http.StatusOK, + notInResult: []string{"data_corruption"}, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + { + name: "ready if CORRUPT alarm is not on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusOK, + }, + { + name: "ready if CORRUPT alarm is excluded", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}, {MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz?exclude=data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz?exclude=non_exist", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + authStore: auth.NewAuthStore(logger, be, nil, 0), } - health, err := parseHealthOutput(res.Body) - if err != nil { - t.Errorf("fail parse health check output %v", err) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + // OK before alarms are activated. + checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil) + // Activate the alarms. + s.alarms = tt.alarms + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + +func TestSerializableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Not alive if range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } - if health.Health != tt.expectHealth { - t.Errorf("want health %s but got %s", tt.expectHealth, health.Health) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "serializable_read", tt.expectStatusCode) + }) + } +} + +func TestLinearizableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Alive if lineariable range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + linearizableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode) }) } } -func parseHealthOutput(body io.Reader) (Health, error) { - obj := Health{} - d, derr := io.ReadAll(body) - if derr != nil { - return obj, derr +func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { + res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)}) + + if err != nil { + t.Fatalf("fail serve http request %s %v", url, err) + } + if res.StatusCode != expectStatusCode { + t.Errorf("want statusCode %d but got %d", expectStatusCode, res.StatusCode) + } + defer res.Body.Close() + b, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("Failed to read response for %s", url) } - if err := json.Unmarshal(d, &obj); err != nil { - return obj, err + result := string(b) + for _, substr := range inResult { + if !strings.Contains(result, substr) { + t.Errorf("Could not find substring : %s, in response: %s", substr, result) + return + } + } + for _, substr := range notInResult { + if strings.Contains(result, substr) { + t.Errorf("Do not expect substring : %s, in response: %s", substr, result) + return + } + } +} + +func checkMetrics(t *testing.T, url, checkName string, expectStatusCode int) { + defer healthCheckGauge.Reset() + defer healthCheckCounter.Reset() + + typeName := strings.TrimPrefix(strings.Split(url, "?")[0], "/") + if len(checkName) == 0 { + checkName = strings.Split(typeName, "/")[1] + typeName = strings.Split(typeName, "/")[0] + } + + expectedSuccessCount := 1 + expectedErrorCount := 0 + if expectStatusCode != http.StatusOK { + expectedSuccessCount = 0 + expectedErrorCount = 1 + } + + gather, _ := prometheus.DefaultGatherer.Gather() + for _, mf := range gather { + name := *mf.Name + val := 0 + switch name { + case "etcd_server_healthcheck": + val = int(mf.GetMetric()[0].GetGauge().GetValue()) + case "etcd_server_healthcheck_total": + val = int(mf.GetMetric()[0].GetCounter().GetValue()) + default: + continue + } + labelMap := make(map[string]string) + for _, label := range mf.GetMetric()[0].Label { + labelMap[label.GetName()] = label.GetValue() + } + if typeName != labelMap["type"] { + continue + } + if labelMap["name"] != checkName { + continue + } + if statusLabel, found := labelMap["status"]; found && statusLabel == HealthStatusError { + if val != expectedErrorCount { + t.Fatalf("%s got errorCount %d, wanted %d\n", name, val, expectedErrorCount) + } + } else { + if val != expectedSuccessCount { + t.Fatalf("%s got expectedSuccessCount %d, wanted %d\n", name, val, expectedSuccessCount) + } + } } - return obj, nil } diff --git a/server/proxy/grpcproxy/health.go b/server/proxy/grpcproxy/health.go index 882af4b46a8..8747dc3c57d 100644 --- a/server/proxy/grpcproxy/health.go +++ b/server/proxy/grpcproxy/health.go @@ -31,7 +31,9 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkHealth(c) })) + mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { + return checkHealth(c) + })) } // HandleProxyHealth registers health handler on '/proxy/health'. @@ -39,7 +41,9 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) })) + mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { + return checkProxyHealth(c) + })) } func checkHealth(c *clientv3.Client) etcdhttp.Health {