Skip to content

Commit

Permalink
Minimize missed rule group evaluations
Browse files Browse the repository at this point in the history
Signed-off-by: Anand Rajagopal <[email protected]>
  • Loading branch information
rajagopalanand committed Aug 15, 2024
1 parent 8df8246 commit 4c7f4c5
Show file tree
Hide file tree
Showing 11 changed files with 1,073 additions and 170 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Querier: Remove `-querier.at-modifier-enabled` flag. #6157
* [CHANGE] Tracing: Remove deprecated `oltp_endpoint` config entirely. #6158
* [CHANGE] Store Gateway: Enable store gateway zone stable shuffle sharding by default. #6161
* [FEATURE] Ruler: Minimize rule group missed evaluations via `-ruler.enable-ha` flag. #6129
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4336,6 +4336,10 @@ ring:
# CLI flag: -ruler.ring.final-sleep
[final_sleep: <duration> | default = 0s]
# Keep instance in the ring on shut down.
# CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
# Period with which to attempt to flush rule groups.
# CLI flag: -ruler.flush-period
[flush_period: <duration> | default = 1m]
Expand Down Expand Up @@ -4370,6 +4374,14 @@ ring:
# Disable the rule_group label on exported metrics
# CLI flag: -ruler.disable-rule-group-label
[disable_rule_group_label: <boolean> | default = false]
# Enable high availability
# CLI flag: -ruler.enable-ha-evaluation
[enable_ha_evaluation: <boolean> | default = false]
# Timeout for fanout calls to other rulers
# CLI flag: -ruler.list-rules-fanout-timeout
[list_rules_fanout_timeout: <duration> | default = 2m]
```

### `ruler_storage_config`
Expand Down
192 changes: 112 additions & 80 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,112 +978,144 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}

func TestRulerKeepFiring(t *testing.T) {
func TestRulerHA(t *testing.T) {
const numRulesGroups = 20

random := rand.New(rand.NewSource(time.Now().UnixNano()))
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
alertCount := 0
evalInterval, _ := model.ParseDuration("5s")
for i := 0; i < numRulesGroups; i++ {
num := random.Intn(10)
var ruleNode yaml.Node
var exprNode yaml.Node

ruleNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName

if num%2 == 0 {
alertCount++
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: evalInterval,
Rules: []rulefmt.RuleNode{{
Alert: ruleNode,
Expr: exprNode,
}},
}
} else {
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: evalInterval,
Rules: []rulefmt.RuleNode{{
Record: ruleNode,
Expr: exprNode,
}},
}
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
overrides := map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-ruler.ring.replication-factor": "2",
"-ruler.enable-ha-evaluation": "true",
"-ruler.poll-interval": "5s",
}

rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
overrides,
)

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",
// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))

"-querier.max-fetched-chunks-per-query": "50",
},
)
// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
namespaceNameCount := make([]int, len(namespaceNames))
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, ruleGroup := range ruleGroups {
index := nsRand.Intn(len(namespaceNames))
namespaceNameCount[index] = namespaceNameCount[index] + 1
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
}

const namespace = "test"
const user = "user"
// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))
ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
require.NoError(t, err)
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
require.NoError(t, err)

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
require.NoError(t, err)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
err = ruler2.Kill()
require.NoError(t, err)

expression := "vector(1) > 0" // Alert will fire
groupName := "rule_group_1"
ruleName := "rule_keep_firing"
// wait for another sync
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))

require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))
rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3)
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

m := ruleGroupMatcher(user, namespace, groupName)
t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
require.NoError(t, err)

groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
ruler1Rules, err := c.GetRuleGroups()
require.NoError(t, err)
require.NotEmpty(t, groups)
require.Equal(t, 1, len(groups[0].Rules))
alert := parseAlertFromRule(t, groups[0].Rules[0])
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.Empty(t, alert.Alerts[0].KeepFiringSince) //Alert expression not resolved, keepFiringSince should be empty

expression = "vector(1) > 1" // Resolve, should keep firing for set duration
ts := time.Now()
require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(5), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})

ruler3Rules, err := c3.GetRuleGroups()
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))

alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, "firing", alert.State)
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.NotEmpty(t, alert.Alerts[0].KeepFiringSince)
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")

time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})

ruleCount := 0
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
for _, v := range ruleGroups {
ruleCount += len(v)
}
}

countFunc(ruler1Rules)
require.Equal(t, numRulesGroups, ruleCount)
ruleCount = 0
countFunc(ruler3Rules)
require.Equal(t, numRulesGroups, ruleCount)

results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))
alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
require.Equal(t, numRulesGroups, len(results))
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
Expand Down
7 changes: 7 additions & 0 deletions pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/services"
)

func Test_newRulerClientFactory(t *testing.T) {
Expand Down Expand Up @@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) {

type mockRulerServer struct{}

func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) {
return &LivenessCheckResponse{
State: int32(services.Running),
}, nil
}

func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
return &RulesResponse{}, nil
}
2 changes: 1 addition & 1 deletion pkg/ruler/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestMergeGroupStateDesc(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
out := mergeGroupStateDesc(tc.input)
out := dedupStateDesc(tc.input)
slices.SortFunc(out, func(a, b *GroupStateDesc) int {
fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace)
if fileCompare != 0 {
Expand Down
Loading

0 comments on commit 4c7f4c5

Please sign in to comment.