From c96ed49cc8d515c3999557d5b2d2fed85968d212 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Tue, 10 Sep 2024 15:09:44 -0500 Subject: [PATCH 1/2] Add timeout and failureThreshold to multicluster probe - This adds the `probeSpec.failureThreshold` and `probeSpec.timeout` fields to the Link CRD spec. - Likewise, the `gateway.probe.failureThreshold` and `gateway.probe.timeout` fields are added to the linkerd-multicluster chart, that are used to populate the new `mirror.linkerd.io/probe-failure-threshold` and `mirror.linkerd.io/probe-timeout` annotations in the gateway service (consumed by `linkerd mc link` to populate probe spec). - In the probe worker, we replace the hard-coded 50s timeout with the new timeout config (which now defaults to 30s). And the probe loop got refactored in order to not mark the gateway as unhealty until the consecutive failures threshold is reached. --- .../charts/linkerd-multicluster/README.md | 2 + .../templates/gateway.yaml | 2 + .../templates/link-crd.yaml | 9 +++ .../charts/linkerd-multicluster/values.yaml | 4 + .../cmd/testdata/install_default.golden | 11 +++ multicluster/cmd/testdata/install_ha.golden | 11 +++ multicluster/cmd/testdata/install_psp.golden | 11 +++ multicluster/service-mirror/probe_worker.go | 77 ++++++++++--------- multicluster/values/values.go | 10 ++- pkg/k8s/labels.go | 6 ++ pkg/multicluster/link.go | 74 +++++++++++++++--- 11 files changed, 164 insertions(+), 53 deletions(-) diff --git a/multicluster/charts/linkerd-multicluster/README.md b/multicluster/charts/linkerd-multicluster/README.md index 79d9e05d6b02b..c854f2ab53c67 100644 --- a/multicluster/charts/linkerd-multicluster/README.md +++ b/multicluster/charts/linkerd-multicluster/README.md @@ -80,9 +80,11 @@ Kubernetes: `>=1.22.0-0` | gateway.nodeSelector | object | `{}` | Node selectors for the gateway pod | | gateway.pauseImage | string | `"gcr.io/google_containers/pause:3.2"` | The pause container to use | | gateway.port | int | `4143` | The port on which all the gateway will accept incoming traffic | +| gateway.probe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed | | gateway.probe.path | string | `"/ready"` | The path that will be used by remote clusters for determining whether the gateway is alive | | gateway.probe.port | int | `4191` | The port used for liveliness probing | | gateway.probe.seconds | int | `3` | The interval (in seconds) between liveness probes | +| gateway.probe.timeout | string | `"30s"` | Probe request timeout (in go's time.Duration format) | | gateway.replicas | int | `1` | Number of replicas for the gateway pod | | gateway.serviceAnnotations | object | `{}` | Annotations to add to the gateway service | | gateway.serviceExternalTrafficPolicy | string | `""` | Set externalTrafficPolicy on gateway service | diff --git a/multicluster/charts/linkerd-multicluster/templates/gateway.yaml b/multicluster/charts/linkerd-multicluster/templates/gateway.yaml index 66b8b0a08afa7..963aac5743592 100644 --- a/multicluster/charts/linkerd-multicluster/templates/gateway.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/gateway.yaml @@ -102,8 +102,10 @@ metadata: {{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }} annotations: mirror.linkerd.io/gateway-identity: {{.Values.gateway.name}}.{{.Release.Namespace}}.serviceaccount.identity.{{.Values.linkerdNamespace}}.{{.Values.identityTrustDomain}} + mirror.linkerd.io/probe-failure-threshold: "{{.Values.gateway.probe.failureThreshold}}" mirror.linkerd.io/probe-period: "{{.Values.gateway.probe.seconds}}" mirror.linkerd.io/probe-path: {{.Values.gateway.probe.path}} + mirror.linkerd.io/probe-timeout: "{{.Values.gateway.probe.timeout}}" mirror.linkerd.io/multicluster-gateway: "true" component: gateway {{ include "partials.annotations.created-by" . }} diff --git a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml index 4ff9946daf204..945de8fa70ca6 100644 --- a/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml +++ b/multicluster/charts/linkerd-multicluster/templates/link-crd.yaml @@ -40,6 +40,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -49,6 +53,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/charts/linkerd-multicluster/values.yaml b/multicluster/charts/linkerd-multicluster/values.yaml index 1433166692257..8c09f541bdd26 100644 --- a/multicluster/charts/linkerd-multicluster/values.yaml +++ b/multicluster/charts/linkerd-multicluster/values.yaml @@ -12,6 +12,8 @@ gateway: # nodePort -- Set the gateway nodePort (for LoadBalancer or NodePort) to a specific value # nodePort: probe: + # -- Minimum consecutive failures for the probe to be considered failed + failureThreshold: 3 # -- The path that will be used by remote clusters for determining whether the # gateway is alive path: /ready @@ -21,6 +23,8 @@ gateway: # nodePort: # -- The interval (in seconds) between liveness probes seconds: 3 + # -- Probe request timeout (in go's time.Duration format) + timeout: 30s # -- Annotations to add to the gateway service serviceAnnotations: {} # -- Set externalTrafficPolicy on gateway service diff --git a/multicluster/cmd/testdata/install_default.golden b/multicluster/cmd/testdata/install_default.golden index 5c9237d2c3359..28a77de3efb08 100644 --- a/multicluster/cmd/testdata/install_default.golden +++ b/multicluster/cmd/testdata/install_default.golden @@ -68,8 +68,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -277,6 +279,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -286,6 +292,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/cmd/testdata/install_ha.golden b/multicluster/cmd/testdata/install_ha.golden index 939ab7a5da2ed..a1025bbdddba1 100644 --- a/multicluster/cmd/testdata/install_ha.golden +++ b/multicluster/cmd/testdata/install_ha.golden @@ -106,8 +106,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -349,6 +351,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -358,6 +364,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/cmd/testdata/install_psp.golden b/multicluster/cmd/testdata/install_psp.golden index 74373d53d9c38..1db3da2ff08e4 100644 --- a/multicluster/cmd/testdata/install_psp.golden +++ b/multicluster/cmd/testdata/install_psp.golden @@ -68,8 +68,10 @@ metadata: linkerd.io/extension: multicluster annotations: mirror.linkerd.io/gateway-identity: linkerd-gateway.linkerd-multicluster.serviceaccount.identity.linkerd.cluster.local + mirror.linkerd.io/probe-failure-threshold: "3" mirror.linkerd.io/probe-period: "3" mirror.linkerd.io/probe-path: /ready + mirror.linkerd.io/probe-timeout: "30s" mirror.linkerd.io/multicluster-gateway: "true" component: gateway linkerd.io/created-by: linkerd/helm linkerdVersionValue @@ -311,6 +313,10 @@ spec: description: Spec for gateway health probe type: object properties: + failureThreshold: + default: "3" + description: Minimum consecutive failures for the probe to be considered failed + type: string path: description: Path of remote gateway health endpoint type: string @@ -320,6 +326,11 @@ spec: port: description: Port of remote gateway health endpoint type: string + timeout: + default: 30s + description: Probe request timeout + format: duration + type: string selector: description: Kubernetes Label Selector type: object diff --git a/multicluster/service-mirror/probe_worker.go b/multicluster/service-mirror/probe_worker.go index 9e7de9ad15c6f..33bc947f0be22 100644 --- a/multicluster/service-mirror/probe_worker.go +++ b/multicluster/service-mirror/probe_worker.go @@ -13,8 +13,6 @@ import ( logging "github.com/sirupsen/logrus" ) -const httpGatewayTimeoutMillis = 50000 - // ProbeWorker is responsible for monitoring gateways using a probe specification type ProbeWorker struct { localGatewayName string @@ -65,76 +63,81 @@ func (pw *ProbeWorker) Start() { } func (pw *ProbeWorker) run() { + successLabel := prometheus.Labels{probeSuccessfulLabel: "true"} + notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"} + probeTickerPeriod := pw.probeSpec.Period maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period probeTicker := NewTicker(probeTickerPeriod, maxJitter) defer probeTicker.Stop() + var failures uint32 = 0 + probeLoop: for { select { case <-pw.stopCh: break probeLoop case <-probeTicker.C: - pw.doProbe() + start := time.Now() + if err := pw.doProbe(); err != nil { + pw.log.Warn(err) + failures++ + if failures < pw.probeSpec.FailureThreshold { + continue probeLoop + } + + pw.log.Warnf("Failure threshold (%d) reached - Marking as unhealthy", pw.probeSpec.FailureThreshold) + pw.metrics.alive.Set(0) + pw.metrics.probes.With(notSuccessLabel).Inc() + if pw.alive { + pw.alive = false + pw.Liveness <- false + } + } else { + end := time.Since(start) + failures = 0 + + pw.log.Debug("Gateway is healthy") + pw.metrics.alive.Set(1) + pw.metrics.latency.Set(float64(end.Milliseconds())) + pw.metrics.latencies.Observe(float64(end.Milliseconds())) + pw.metrics.probes.With(successLabel).Inc() + if !pw.alive { + pw.alive = true + pw.Liveness <- true + } + } } } } -func (pw *ProbeWorker) doProbe() { +func (pw *ProbeWorker) doProbe() error { pw.RLock() defer pw.RUnlock() - successLabel := prometheus.Labels{probeSuccessfulLabel: "true"} - notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"} - client := http.Client{ - Timeout: httpGatewayTimeoutMillis * time.Millisecond, + Timeout: pw.probeSpec.Timeout, } strPort := strconv.Itoa(int(pw.probeSpec.Port)) urlAddress := net.JoinHostPort(pw.localGatewayName, strPort) req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil) if err != nil { - pw.log.Errorf("Could not create a GET request to gateway: %s", err) - return + return fmt.Errorf("could not create a GET request to gateway: %w", err) } - start := time.Now() resp, err := client.Do(req) - end := time.Since(start) if err != nil { - pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err) - pw.metrics.alive.Set(0) - pw.metrics.probes.With(notSuccessLabel).Inc() - if pw.alive { - pw.alive = false - pw.Liveness <- false - } - return + return fmt.Errorf("problem connecting with gateway: %w", err) } if resp.StatusCode != 200 { - pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode) - pw.metrics.alive.Set(0) - pw.metrics.probes.With(notSuccessLabel).Inc() - if pw.alive { - pw.alive = false - pw.Liveness <- false - } - } else { - pw.log.Debug("Gateway is healthy") - pw.metrics.alive.Set(1) - pw.metrics.latency.Set(float64(end.Milliseconds())) - pw.metrics.latencies.Observe(float64(end.Milliseconds())) - pw.metrics.probes.With(successLabel).Inc() - if !pw.alive { - pw.alive = true - pw.Liveness <- true - } + return fmt.Errorf("gateway returned unexpected status %d", resp.StatusCode) } if err := resp.Body.Close(); err != nil { pw.log.Warnf("Failed to close response body %s", err) } + return nil } diff --git a/multicluster/values/values.go b/multicluster/values/values.go index 81023c0150fdc..b7eb95504822d 100644 --- a/multicluster/values/values.go +++ b/multicluster/values/values.go @@ -62,10 +62,12 @@ type Gateway struct { // Probe contains all options for the Probe Service type Probe struct { - Path string `json:"path"` - Port uint32 `json:"port"` - NodePort uint32 `json:"nodePort"` - Seconds uint32 `json:"seconds"` + FailureThreshold uint32 `json:"failureThreshold"` + Path string `json:"path"` + Port uint32 `json:"port"` + NodePort uint32 `json:"nodePort"` + Seconds uint32 `json:"seconds"` + Timeout string `json:"timeout"` } // NewInstallValues returns a new instance of the Values type. diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index de17415a095f8..d4c0465aebdff 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -467,12 +467,18 @@ const ( // GatewayIdentity can be found on the remote gateway service GatewayIdentity = SvcMirrorPrefix + "/gateway-identity" + // GatewayProbeFailureThreshold is the minimum consecutive failures for the probe to be considered failed + GatewayProbeFailureThreshold = SvcMirrorPrefix + "/probe-failure-threshold" + // GatewayProbePeriod the interval at which the health of the gateway should be probed GatewayProbePeriod = SvcMirrorPrefix + "/probe-period" // GatewayProbePath the path at which the health of the gateway should be probed GatewayProbePath = SvcMirrorPrefix + "/probe-path" + // GatewayProbeTimeout is the probe request timeout + GatewayProbeTimeout = SvcMirrorPrefix + "/probe-timeout" + // ConfigKeyName is the key in the secret that stores the kubeconfig needed to connect // to a remote cluster ConfigKeyName = "kubeconfig" diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index 49b36846de220..61a094342f0e8 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -17,14 +17,19 @@ import ( "k8s.io/client-go/dynamic" ) +const DefaultFailureThreshold = 3 +const DefaultProbeTimeout = "30s" + type ( // ProbeSpec defines how a gateway should be queried for health. Once per // period, the probe workers will send an HTTP request to the remote gateway // on the given port with the given path and expect a HTTP 200 response. ProbeSpec struct { - Path string - Port uint32 - Period time.Duration + FailureThreshold uint32 + Path string + Port uint32 + Period time.Duration + Timeout time.Duration } // Link is an internal representation of the link.multicluster.linkerd.io @@ -176,9 +181,11 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { "gatewayPort": fmt.Sprintf("%d", l.GatewayPort), "gatewayIdentity": l.GatewayIdentity, "probeSpec": map[string]interface{}{ - "path": l.ProbeSpec.Path, - "port": fmt.Sprintf("%d", l.ProbeSpec.Port), - "period": l.ProbeSpec.Period.String(), + "failureThreshold": fmt.Sprintf("%d", l.ProbeSpec.FailureThreshold), + "path": l.ProbeSpec.Path, + "port": fmt.Sprintf("%d", l.ProbeSpec.Port), + "period": l.ProbeSpec.Period.String(), + "timeout": l.ProbeSpec.Timeout.String(), }, } @@ -219,6 +226,17 @@ func (l Link) ToUnstructured() (unstructured.Unstructured, error) { // ExtractProbeSpec parses the ProbSpec from a gateway service's annotations. func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) { + // older gateways might not have this field + failureThreshold := uint64(DefaultFailureThreshold) + failureThresholdStr := gateway.Annotations[k8s.GatewayProbeFailureThreshold] + if failureThresholdStr != "" { + var err error + failureThreshold, err = strconv.ParseUint(failureThresholdStr, 10, 32) + if err != nil { + return ProbeSpec{}, err + } + } + path := gateway.Annotations[k8s.GatewayProbePath] if path == "" { return ProbeSpec{}, errors.New("probe path is empty") @@ -234,10 +252,22 @@ func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) { return ProbeSpec{}, err } + timeoutStr := gateway.Annotations[k8s.GatewayProbeTimeout] + if timeoutStr == "" { + timeoutStr = DefaultProbeTimeout + } + + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return ProbeSpec{}, err + } + return ProbeSpec{ - Path: path, - Port: port, - Period: time.Duration(period) * time.Second, + FailureThreshold: uint32(failureThreshold), + Path: path, + Port: port, + Period: time.Duration(period) * time.Second, + Timeout: timeout, }, nil } @@ -294,6 +324,24 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) { return ProbeSpec{}, err } + failureThresholdStr, err := stringField(obj, "failureThreshold") + if err != nil { + return ProbeSpec{}, err + } + failureThreshold, err := strconv.ParseUint(failureThresholdStr, 10, 32) + if err != nil { + return ProbeSpec{}, err + } + + timeoutStr, err := stringField(obj, "timeout") + if err != nil { + return ProbeSpec{}, err + } + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return ProbeSpec{}, err + } + path, err := stringField(obj, "path") if err != nil { return ProbeSpec{}, err @@ -309,9 +357,11 @@ func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) { } return ProbeSpec{ - Path: path, - Port: uint32(port), - Period: period, + FailureThreshold: uint32(failureThreshold), + Path: path, + Port: uint32(port), + Period: period, + Timeout: timeout, }, nil } From f88566bee1e2484676cb0a73a0f2730c16ea0403 Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Wed, 25 Sep 2024 05:06:40 -0500 Subject: [PATCH 2/2] Make probeTicker.C synchronous --- multicluster/service-mirror/jittered_ticker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multicluster/service-mirror/jittered_ticker.go b/multicluster/service-mirror/jittered_ticker.go index 880158b305163..90f05a71355db 100644 --- a/multicluster/service-mirror/jittered_ticker.go +++ b/multicluster/service-mirror/jittered_ticker.go @@ -38,7 +38,7 @@ func NewTicker(minDuration time.Duration, maxJitter time.Duration) *Ticker { if maxJitter < 0 { log.WithField("jitter", minDuration).Panic("Negative jitter") } - c := make(chan time.Time, 1) + c := make(chan time.Time) ticker := &Ticker{ C: c, stop: make(chan bool),