Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimize missed rule group evaluations #6129

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,10 @@ ruler_client:
# CLI flag: -ruler.client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# Timeout for downstream rulers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a doc talking about Ruler HA. How to enable it with various configs/flags.
We added 3 new flags in this PR but I am confused if I need to tune/change them somehow or they should work out of the box.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will submit another PR for the doc

# CLI flag: -ruler.client.remote-timeout
[remote_timeout: <duration> | default = 2m]

# How frequently to evaluate rules
# CLI flag: -ruler.evaluation-interval
[evaluation_interval: <duration> | default = 1m]
Expand Down Expand Up @@ -4340,6 +4344,10 @@ ring:
# CLI flag: -ruler.ring.final-sleep
[final_sleep: <duration> | default = 0s]

# Keep instance in the ring on shut down.
# CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
alanprot marked this conversation as resolved.
Show resolved Hide resolved

# Period with which to attempt to flush rule groups.
# CLI flag: -ruler.flush-period
[flush_period: <duration> | default = 1m]
Expand Down Expand Up @@ -4374,6 +4382,10 @@ ring:
# Disable the rule_group label on exported metrics
# CLI flag: -ruler.disable-rule-group-label
[disable_rule_group_label: <boolean> | default = false]

# Enable high availability
# CLI flag: -ruler.enable-ha-evaluation
[enable_ha_evaluation: <boolean> | default = false]
```

### `ruler_storage_config`
Expand Down
170 changes: 158 additions & 12 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,152 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}

func TestRulerHAEvaluation(t *testing.T) {
const numRulesGroups = 20

random := rand.New(rand.NewSource(time.Now().UnixNano()))
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
evalInterval, _ := model.ParseDuration("2s")
for i := 0; i < numRulesGroups; i++ {
num := random.Intn(10)
var ruleNode yaml.Node
var exprNode yaml.Node

ruleNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName

if num%2 == 0 {
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: evalInterval,
Rules: []rulefmt.RuleNode{{
Alert: ruleNode,
Expr: exprNode,
}},
}
} else {
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: evalInterval,
Rules: []rulefmt.RuleNode{{
Record: ruleNode,
Expr: exprNode,
}},
}
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
overrides := map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-ruler.ring.replication-factor": "2",
"-ruler.enable-ha-evaluation": "true",
"-ruler.poll-interval": "5s",
"-ruler.client.remote-timeout": "10ms",
}

rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
overrides,
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
namespaceNameCount := make([]int, len(namespaceNames))
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, ruleGroup := range ruleGroups {
index := nsRand.Intn(len(namespaceNames))
namespaceNameCount[index] = namespaceNameCount[index] + 1
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
}

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
require.NoError(t, err)
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
require.NoError(t, err)

err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
require.NoError(t, err)

err = ruler2.Kill()
require.NoError(t, err)

// wait for another sync
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))

rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3)
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))

c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
require.NoError(t, err)

ruler1Rules, err := c.GetRuleGroups()
require.NoError(t, err)

ruler3Rules, err := c3.GetRuleGroups()
require.NoError(t, err)

ruleCount := 0
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
for _, v := range ruleGroups {
ruleCount += len(v)
}
}

countFunc(ruler1Rules)
require.Equal(t, numRulesGroups, ruleCount)
ruleCount = 0
countFunc(ruler3Rules)
require.Equal(t, numRulesGroups, ruleCount)

// each rule group in this test is set to evaluate at a 2 second interval. If a Ruler is down and another Ruler
// assumes ownership, it might not immediately evaluate until it's time to evaluate. The following sleep is to ensure the
// rulers have evaluated the rule groups
time.Sleep(2100 * time.Millisecond)
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
require.NoError(t, err)
require.Equal(t, numRulesGroups, len(results))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused on how this test is validating HA. Should we validate that the total number of validations across all rules is what is expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the test to assert that the rules are evaluated by the remaining rulers

for _, v := range results {
require.False(t, v.LastEvaluation.IsZero())
}
}

func TestRulerKeepFiring(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -1125,7 +1271,12 @@ type Alert struct {
Value string `json:"value"`
}

func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}

func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
// Prepare rule group with invalid rule.
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

Expand All @@ -1136,19 +1287,13 @@ func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression st
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
KeepFiringFor: keepFiring,
Record: recordNode,
Expr: exprNode,
}},
}
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}

func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
// Prepare rule group with invalid rule.
func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

Expand All @@ -1159,8 +1304,9 @@ func ruleGroupWithRule(groupName string, ruleName string, expression string) rul
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
Alert: recordNode,
Expr: exprNode,
KeepFiringFor: keepFiring,
}},
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/services"
)

func Test_newRulerClientFactory(t *testing.T) {
Expand Down Expand Up @@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) {

type mockRulerServer struct{}

func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) {
return &LivenessCheckResponse{
State: int32(services.Running),
}, nil
}

func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
return &RulesResponse{}, nil
}
Loading
Loading