Skip to content

Commit

Permalink
Merge pull request #1138 from cloudflare/fallback
Browse files Browse the repository at this point in the history
Limit how long we can check other servers
  • Loading branch information
prymitive authored Oct 3, 2024
2 parents 87bfd23 + 740164c commit 9024afa
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 10 deletions.
8 changes: 7 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## v0.65.4
## v0.66.0

### Added

- Added `fallbackTimeout` option to the [promql/series](checks/promql/series.md) check
that controls how much time pint can spend checking other Prometheus servers for missing
metrics.

### Fixed

Expand Down
10 changes: 10 additions & 0 deletions docs/checks/promql/series.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ Syntax:

```js
check "promql/series" {
lookbackRange = "7d"
lookbackStep = "5m"
ignoreMetrics = [ "(.*)", ... ]
ignoreLabelsValue = { "...": [ "...", ... ] }
fallbackTimeout = "5m"
}
```

Expand All @@ -166,6 +169,13 @@ check "promql/series" {
comments, see below.
The value of this option is a map where the key is a metric selector to match on and the value
is the list of label names.
- `fallbackTimeout` - if a query uses a metric that is missing from a Prometheus server pint will
check if that metric is present on any other Prometheus server and report any findings.
This option controls how long can these extra checks take if there a long list of additional
servers to check. pint will abort checking more Prometheus servers when it reaches that time limit.
This is a timeout for the whole operation of checking other Prometheus servers. With the default limit
of 5 minutes and if there's 10 extra Prometheus servers to check and it takes 5 minutes to check first
4 servers then pint will abort checking remaining 6 servers.

Example:

Expand Down
9 changes: 7 additions & 2 deletions internal/checks/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ func runTests(t *testing.T, testCases []checkTest) {
entries, err := parseContent(tc.content)
require.NoError(t, err, "cannot parse rule content")
for _, entry := range entries {
ctx := context.WithValue(context.Background(), promapi.AllPrometheusServers, proms)
ctx := context.Background()
if tc.ctx != nil {
ctx = tc.ctx(uri)
}
ctx = context.WithValue(ctx, promapi.AllPrometheusServers, proms)
problems := tc.checker(prom).Check(ctx, entry.Path, entry.Rule, tc.entries)
require.Equal(t, tc.problems(uri), problems)
}
Expand Down Expand Up @@ -468,11 +469,15 @@ func (mr metadataResponse) respond(w http.ResponseWriter, _ *http.Request) {
}

type sleepResponse struct {
resp responseWriter
sleep time.Duration
}

func (sr sleepResponse) respond(_ http.ResponseWriter, _ *http.Request) {
func (sr sleepResponse) respond(w http.ResponseWriter, r *http.Request) {
time.Sleep(sr.sleep)
if sr.resp != nil {
sr.resp.respond(w, r)
}
}

var (
Expand Down
49 changes: 43 additions & 6 deletions internal/checks/promql_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ type PromqlSeriesSettings struct {
LookbackRange string `hcl:"lookbackRange,optional" json:"lookbackRange,omitempty"`
LookbackStep string `hcl:"lookbackStep,optional" json:"lookbackStep,omitempty"`
IgnoreMetrics []string `hcl:"ignoreMetrics,optional" json:"ignoreMetrics,omitempty"`
FallbackTimeout string `hcl:"fallbackTimeout,optional" json:"fallbackTimeout,omitempty"`
ignoreMetricsRe []*regexp.Regexp
lookbackRangeDuration time.Duration
lookbackStepDuration time.Duration
fallbackTimeout time.Duration
}

func (c *PromqlSeriesSettings) Validate() error {
Expand Down Expand Up @@ -59,6 +61,15 @@ func (c *PromqlSeriesSettings) Validate() error {
c.lookbackStepDuration = time.Duration(dur)
}

c.fallbackTimeout = time.Minute * 5
if c.FallbackTimeout != "" {
dur, err := model.ParseDuration(c.FallbackTimeout)
if err != nil {
return err
}
c.fallbackTimeout = time.Duration(dur)
}

for selector := range c.IgnoreLabelsValue {
if _, err := promParser.ParseMetricSelector(selector); err != nil {
return fmt.Errorf("%q is not a valid PromQL metric selector: %w", selector, err)
Expand Down Expand Up @@ -300,7 +311,7 @@ func (c SeriesCheck) Check(ctx context.Context, _ discovery.Path, rule parser.Ru
Lines: expr.Value.Lines,
Reporter: c.Reporter(),
Text: text,
Details: c.checkOtherServer(ctx, selector.String()),
Details: c.checkOtherServer(ctx, selector.String(), settings.fallbackTimeout),
Severity: severity,
})
slog.Debug("No historical series for base metric", slog.String("check", c.Reporter()), slog.String("selector", (&bareSelector).String()))
Expand Down Expand Up @@ -564,10 +575,15 @@ func (c SeriesCheck) Check(ctx context.Context, _ discovery.Path, rule parser.Ru
return problems
}

func (c SeriesCheck) checkOtherServer(ctx context.Context, query string) string {
func (c SeriesCheck) checkOtherServer(ctx context.Context, query string, timeout time.Duration) string {
var servers []*promapi.FailoverGroup
if val := ctx.Value(promapi.AllPrometheusServers); val != nil {
servers = val.([]*promapi.FailoverGroup)
for _, s := range val.([]*promapi.FailoverGroup) {
if s.Name() == c.prom.Name() {
continue
}
servers = append(servers, s)
}
}

if len(servers) == 0 {
Expand All @@ -579,10 +595,31 @@ func (c SeriesCheck) checkOtherServer(ctx context.Context, query string) string
buf.WriteString(query)
buf.WriteString("` was found on other prometheus servers:\n\n")

var matches, skipped int
start := time.Now()
var tested, matches, skipped int
for _, prom := range servers {
slog.Debug("Checking if metric exists on any other Prometheus server", slog.String("check", c.Reporter()), slog.String("selector", query))

if time.Since(start) >= timeout {
slog.Debug("Time limit reached for checking if metric exists on any other Prometheus server",
slog.String("check", c.Reporter()),
slog.String("selector", query),
)
buf.WriteString("\npint tried to check ")
buf.WriteString(strconv.Itoa(len(servers)))
buf.WriteString(" server(s) but stopped after checking ")
buf.WriteString(strconv.Itoa(tested))
buf.WriteString(" server(s) due to reaching time limit (")
buf.WriteString(output.HumanizeDuration(timeout))
buf.WriteString(").\n")
break
}

slog.Debug("Checking if metric exists on any other Prometheus server",
slog.String("check", c.Reporter()),
slog.String("name", prom.Name()),
slog.String("selector", query),
)

tested++
qr, err := prom.Query(ctx, fmt.Sprintf("count(%s)", query))
if err != nil {
continue
Expand Down
54 changes: 54 additions & 0 deletions internal/checks/promql_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4070,6 +4070,60 @@ func TestSeriesCheck(t *testing.T) {
},
},
},
{
description: "series present on other servers / timeout",
content: "- record: foo\n expr: notfound\n",
checker: newSeriesCheck,
prometheus: newSimpleProm,
ctx: func(_ string) context.Context {
s := checks.PromqlSeriesSettings{
FallbackTimeout: "50ms",
}
if err := s.Validate(); err != nil {
t.Error(err)
t.FailNow()
}
return context.WithValue(context.Background(), checks.SettingsKey(checks.SeriesCheckName), &s)
},
otherProms: func(uri string) []*promapi.FailoverGroup {
var proms []*promapi.FailoverGroup
for i := range 15 {
proms = append(proms, simpleProm(fmt.Sprintf("prom%d", i), uri+"/other", time.Second, false))
}
return proms
},
problems: func(uri string) []checks.Problem {
return []checks.Problem{
{
Lines: parser.LineRange{
First: 2,
Last: 2,
},
Reporter: checks.SeriesCheckName,
Text: noMetricText("prom", uri, "notfound", "1w"),
Details: fmt.Sprintf("`notfound` was found on other prometheus servers:\n\n- [prom0](%s/other/graph?g0.expr=notfound)\n- [prom1](%s/other/graph?g0.expr=notfound)\n- [prom2](%s/other/graph?g0.expr=notfound)\n\npint tried to check 15 server(s) but stopped after checking 3 server(s) due to reaching time limit (50ms).\n\nYou might be trying to deploy this rule to the wrong Prometheus server instance.\n", uri, uri, uri),
Severity: checks.Bug,
},
}
},
mocks: []*prometheusMock{
{
conds: []requestCondition{requestPathCond{path: "/other/api/v1/query"}},
resp: sleepResponse{
sleep: time.Millisecond * 20,
resp: respondWithSingleInstantVector(),
},
},
{
conds: []requestCondition{requireQueryPath},
resp: respondWithEmptyVector(),
},
{
conds: []requestCondition{requireRangeQueryPath},
resp: respondWithEmptyMatrix(),
},
},
},
}
runTests(t, testCases)
}
12 changes: 12 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2277,6 +2277,18 @@ func TestConfigErrors(t *testing.T) {
}`,
err: `"foo{" is not a valid PromQL metric selector: 1:5: parse error: unexpected end of input inside braces`,
},
{
config: `check "promql/series" { lookbackRange = "1x" }`,
err: `unknown unit "x" in duration "1x"`,
},
{
config: `check "promql/series" { lookbackStep = "1x" }`,
err: `unknown unit "x" in duration "1x"`,
},
{
config: `check "promql/series" { fallbackTimeout = "1x" }`,
err: `unknown unit "x" in duration "1x"`,
},
{
config: `rule {
link ".+++" {}
Expand Down
10 changes: 9 additions & 1 deletion internal/promapi/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,15 @@ func (fg *FailoverGroup) Config(ctx context.Context, cacheTTL time.Duration) (cf

func (fg *FailoverGroup) Query(ctx context.Context, expr string) (qr *QueryResult, err error) {
var uri string
for _, prom := range fg.servers {
for try, prom := range fg.servers {
if try > 0 {
slog.Debug(
"Using failover URI",
slog.String("name", fg.name),
slog.Int("retry", try),
slog.String("uri", prom.safeURI),
)
}
uri = prom.safeURI
qr, err = prom.Query(ctx, expr)
if err == nil {
Expand Down

0 comments on commit 9024afa

Please sign in to comment.