Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout and failureThreshold to multicluster probe #13061

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions multicluster/charts/linkerd-multicluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ Kubernetes: `>=1.22.0-0`
| gateway.nodeSelector | object | `{}` | Node selectors for the gateway pod |
| gateway.pauseImage | string | `"gcr.io/google_containers/pause:3.2"` | The pause container to use |
| gateway.port | int | `4143` | The port on which all the gateway will accept incoming traffic |
| gateway.probe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed |
| gateway.probe.path | string | `"/ready"` | The path that will be used by remote clusters for determining whether the gateway is alive |
| gateway.probe.port | int | `4191` | The port used for liveliness probing |
| gateway.probe.seconds | int | `3` | The interval (in seconds) between liveness probes |
| gateway.probe.timeout | string | `"30s"` | Probe request timeout (in go's time.Duration format) |
| gateway.replicas | int | `1` | Number of replicas for the gateway pod |
| gateway.serviceAnnotations | object | `{}` | Annotations to add to the gateway service |
| gateway.serviceExternalTrafficPolicy | string | `""` | Set externalTrafficPolicy on gateway service |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ metadata:
{{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }}
annotations:
mirror.linkerd.io/gateway-identity: {{.Values.gateway.name}}.{{.Release.Namespace}}.serviceaccount.identity.{{.Values.linkerdNamespace}}.{{.Values.identityTrustDomain}}
mirror.linkerd.io/probe-failure-threshold: "{{.Values.gateway.probe.failureThreshold}}"
mirror.linkerd.io/probe-period: "{{.Values.gateway.probe.seconds}}"
mirror.linkerd.io/probe-path: {{.Values.gateway.probe.path}}
mirror.linkerd.io/probe-timeout: "{{.Values.gateway.probe.timeout}}"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
{{ include "partials.annotations.created-by" . }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
Expand All @@ -49,6 +53,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object
Expand Down
4 changes: 4 additions & 0 deletions multicluster/charts/linkerd-multicluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ gateway:
# nodePort -- Set the gateway nodePort (for LoadBalancer or NodePort) to a specific value
# nodePort:
probe:
# -- Minimum consecutive failures for the probe to be considered failed
failureThreshold: 3
# -- The path that will be used by remote clusters for determining whether the
# gateway is alive
path: /ready
Expand All @@ -21,6 +23,8 @@ gateway:
# nodePort:
# -- The interval (in seconds) between liveness probes
seconds: 3
# -- Probe request timeout (in go's time.Duration format)
timeout: 30s
# -- Annotations to add to the gateway service
serviceAnnotations: {}
# -- Set externalTrafficPolicy on gateway service
Expand Down
11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_default.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_ha.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_psp.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion multicluster/service-mirror/jittered_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewTicker(minDuration time.Duration, maxJitter time.Duration) *Ticker {
if maxJitter < 0 {
log.WithField("jitter", minDuration).Panic("Negative jitter")
}
c := make(chan time.Time, 1)
c := make(chan time.Time)
ticker := &Ticker{
C: c,
stop: make(chan bool),
Expand Down
77 changes: 40 additions & 37 deletions multicluster/service-mirror/probe_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
logging "github.com/sirupsen/logrus"
)

const httpGatewayTimeoutMillis = 50000

// ProbeWorker is responsible for monitoring gateways using a probe specification
type ProbeWorker struct {
localGatewayName string
Expand Down Expand Up @@ -65,76 +63,81 @@ func (pw *ProbeWorker) Start() {
}

func (pw *ProbeWorker) run() {
successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}

probeTickerPeriod := pw.probeSpec.Period
maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
probeTicker := NewTicker(probeTickerPeriod, maxJitter)
defer probeTicker.Stop()

var failures uint32 = 0

probeLoop:
for {
select {
case <-pw.stopCh:
break probeLoop
case <-probeTicker.C:
pw.doProbe()
start := time.Now()
if err := pw.doProbe(); err != nil {
pw.log.Warn(err)
failures++
if failures < pw.probeSpec.FailureThreshold {
continue probeLoop
}

pw.log.Warnf("Failure threshold (%d) reached - Marking as unhealthy", pw.probeSpec.FailureThreshold)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
end := time.Since(start)
failures = 0

pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
}
}
}
}

func (pw *ProbeWorker) doProbe() {
func (pw *ProbeWorker) doProbe() error {
pw.RLock()
defer pw.RUnlock()

successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}

client := http.Client{
Timeout: httpGatewayTimeoutMillis * time.Millisecond,
Timeout: pw.probeSpec.Timeout,
}

strPort := strconv.Itoa(int(pw.probeSpec.Port))
urlAddress := net.JoinHostPort(pw.localGatewayName, strPort)
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil)
if err != nil {
pw.log.Errorf("Could not create a GET request to gateway: %s", err)
return
return fmt.Errorf("could not create a GET request to gateway: %w", err)
}

start := time.Now()
resp, err := client.Do(req)
end := time.Since(start)
if err != nil {
pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
return
return fmt.Errorf("problem connecting with gateway: %w", err)
}
if resp.StatusCode != 200 {
pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
return fmt.Errorf("gateway returned unexpected status %d", resp.StatusCode)
}

if err := resp.Body.Close(); err != nil {
pw.log.Warnf("Failed to close response body %s", err)
}

return nil
}
10 changes: 6 additions & 4 deletions multicluster/values/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type Gateway struct {

// Probe contains all options for the Probe Service
type Probe struct {
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
FailureThreshold uint32 `json:"failureThreshold"`
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
Timeout string `json:"timeout"`
}

// NewInstallValues returns a new instance of the Values type.
Expand Down
6 changes: 6 additions & 0 deletions pkg/k8s/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,18 @@ const (
// GatewayIdentity can be found on the remote gateway service
GatewayIdentity = SvcMirrorPrefix + "/gateway-identity"

// GatewayProbeFailureThreshold is the minimum consecutive failures for the probe to be considered failed
GatewayProbeFailureThreshold = SvcMirrorPrefix + "/probe-failure-threshold"

// GatewayProbePeriod the interval at which the health of the gateway should be probed
GatewayProbePeriod = SvcMirrorPrefix + "/probe-period"

// GatewayProbePath the path at which the health of the gateway should be probed
GatewayProbePath = SvcMirrorPrefix + "/probe-path"

// GatewayProbeTimeout is the probe request timeout
GatewayProbeTimeout = SvcMirrorPrefix + "/probe-timeout"

// ConfigKeyName is the key in the secret that stores the kubeconfig needed to connect
// to a remote cluster
ConfigKeyName = "kubeconfig"
Expand Down
Loading
Loading