Skip to content

Commit

Permalink
pkg/rules: consider group name and file for deduplication
Browse files Browse the repository at this point in the history
Currently only the group name is considered for group deduplication.
However prometheus uses also the group file according to [1].

This fixes it.

[1] https://github.com/prometheus/prometheus/blob/ce838ad6fcbd14b0cf9971a4d093ad672e1469fe/rules/manager.go#L1047-L1055

Signed-off-by: Sergiusz Urbaniak <[email protected]>
  • Loading branch information
s-urbaniak committed Aug 12, 2020
1 parent ae629b2 commit d473931
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 6 deletions.
22 changes: 21 additions & 1 deletion docs/proposals/202003_thanos_rules_federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Thanos Querier fans-out to all know Rules endpoints configured via `--rule` comm

Generally the deduplication logic is less complex than with time series, specifically:

* Deduplication happens first at the rule group level. The identifier is the group name.
* Deduplication happens first at the rule group level. The identifier is the group name and the group file.
* Then, per group name deduplication happens on the rule level, where:

1. the rule type (recording rule vs. alerting rule)
Expand Down Expand Up @@ -171,6 +171,26 @@ Given the following stream of incoming alerting rules will also result in two in
severity: critical
```

Scenario 4:

As specified, the group name and file fields are used for deduplication.

Given the following stream of incoming rule groups:
```text
group: a/file1
group: b/file1
group: a/file2
```

The output becomes:
```text
group: a/file1
group: a/file2
group: b/file1
```

Deduplication of included alerting/recording rules inside groups is described in the previous scenarios.

## Alternatives

* Cortex contains a sharded Ruler. Assigning rules to shards is done via Consul, though a gossip implementation is under development. Shards do not communicate with other shards. Rules come from a store (e.g. a Postgres database).
Expand Down
4 changes: 2 additions & 2 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func dedupGroups(groups []*rulespb.RuleGroup) []*rulespb.RuleGroup {
}

// Sort groups such that they appear next to each other.
sort.Slice(groups, func(i, j int) bool { return groups[i].Name < groups[j].Name })
sort.Slice(groups, func(i, j int) bool { return groups[i].Compare(groups[j]) < 0 })

i := 0
for _, g := range groups[1:] {
if g.Name == groups[i].Name {
if g.Compare(groups[i]) == 0 {
groups[i].Rules = append(groups[i].Rules, g.Rules...)
} else {
i++
Expand Down
113 changes: 110 additions & 3 deletions pkg/rules/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package rules

import (
"context"
"path"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -53,7 +54,6 @@ func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServ
File: filepath.Join(dir, "alerts.yaml"),
Rules: []*rulespb.Rule{
someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert,
someRecording, someRecording, someRecording, someRecording, someRecording,
},
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
Expand All @@ -63,7 +63,6 @@ func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServ
File: filepath.Join(dir, "alerts.yaml"),
Rules: []*rulespb.Rule{
someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert,
someRecording, someRecording, someRecording, someRecording, someRecording, someRecording, someRecording,
},
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
Expand All @@ -87,6 +86,39 @@ func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServ
File: filepath.Join(dir, "alerts.yaml"),
Rules: []*rulespb.Rule{
someAlert, someAlert, someAlert, someAlert,
},
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
},
{
Name: "thanos-bucket-replicate.rules",
File: filepath.Join(dir, "rules.yaml"),
Rules: nil,
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
},
{
Name: "thanos-query.rules",
File: filepath.Join(dir, "rules.yaml"),
Rules: []*rulespb.Rule{
someRecording, someRecording, someRecording, someRecording, someRecording,
},
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
},
{
Name: "thanos-receive.rules",
File: filepath.Join(dir, "rules.yaml"),
Rules: []*rulespb.Rule{
someRecording, someRecording, someRecording, someRecording, someRecording, someRecording, someRecording,
},
Interval: 60,
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
},
{
Name: "thanos-store.rules",
File: filepath.Join(dir, "rules.yaml"),
Rules: []*rulespb.Rule{
someRecording, someRecording, someRecording, someRecording,
},
Interval: 60,
Expand Down Expand Up @@ -165,7 +197,9 @@ func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServ
got[i].EvaluationDurationSeconds = 0
got[i].LastEvaluation = time.Time{}

testutil.Equals(t, expectedForType[i], got[i])
t.Run(got[i].Name+" "+path.Base(got[i].File), func(t *testing.T) {
testutil.Equals(t, expectedForType[i], got[i])
})
}
testutil.Equals(t, expectedForType, got)
})
Expand Down Expand Up @@ -742,6 +776,79 @@ func TestDedupGroups(t *testing.T) {
},
},
},
{
name: "distinct file names",
groups: []*rulespb.RuleGroup{
{
Name: "a",
File: "foo.yaml",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
{
Name: "a",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
{
Name: "b",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "b1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "b2"}),
},
},
{
Name: "c",
},
{
Name: "a",
File: "bar.yaml",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
},
want: []*rulespb.RuleGroup{
{
Name: "a",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
{
Name: "b",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "b1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "b2"}),
},
},
{
Name: "c",
},
{
Name: "a",
File: "bar.yaml",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
{
Name: "a",
File: "foo.yaml",
Rules: []*rulespb.Rule{
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a1"}),
rulespb.NewRecordingRule(&rulespb.RecordingRule{Name: "a2"}),
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions pkg/rules/rulespb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,27 @@ func (r *RuleGroups) MarshalJSON() ([]byte, error) {
return json.Marshal((*plain)(r))
}

// Compare compares rule group x and y and returns:
//
// < 0 if x < y if rule group r1 is not equal and lexically before rule group r2
// 0 if x == y if rule group r1 is logically equal to r2 (r1 and r2 are the "same" rule groups)
// > 0 if x > y if rule group r1 is not equal and lexically after rule group r2
//
// For sorting this makes sure that more "critical" alert states come first.
func (r1 *RuleGroup) Compare(r2 *RuleGroup) int {
return strings.Compare(r1.Key(), r2.Key())
}

// Key returns the group key similar resembling Prometheus logic.
// See https://github.com/prometheus/prometheus/blob/869f1bc587e667b79721852d5badd9f70a39fc3f/rules/manager.go#L1062-L1065
func (r *RuleGroup) Key() string {
if r == nil {
return ""
}

return r.File + ";" + r.Name
}

func (m *Rule) UnmarshalJSON(entry []byte) error {
decider := struct {
Type string `json:"type"`
Expand Down

0 comments on commit d473931

Please sign in to comment.