Skip to content

Commit

Permalink
ruler: Added support for strict rule groups that does not allow parti…
Browse files Browse the repository at this point in the history
…al_response

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Mar 27, 2019
1 parent d65ef8d commit 3107832
Show file tree
Hide file tree
Showing 16 changed files with 908 additions and 327 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ New tracing span:

:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
Expand Down
180 changes: 118 additions & 62 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"syscall"
"time"

thanosrule "github.com/improbable-eng/thanos/pkg/rule"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
Expand Down Expand Up @@ -227,7 +229,7 @@ func runRule(
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group",
},
[]string{"file", "group"},
[]string{"part_resp_strategy", "file", "group"},
)
reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
Expand Down Expand Up @@ -263,54 +265,13 @@ func runRule(
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg),
)

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
mgr *rules.Manager
ruleMgrs = thanosrule.Managers{}
)
{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
Expand All @@ -331,26 +292,56 @@ func runRule(
}
alertQ.Push(res)
}

st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
Registerer: reg,
ExternalURL: nil,
TSDB: st,
})
g.Add(func() error {
mgr.Run()
<-ctx.Done()
mgr.Stop()
return nil
}, func(error) {
cancel()
})
}

{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"type": "non-strict"}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, false)

ruleMgrs[storepb.PartialResponseStrategy_WARN] = rules.NewManager(&opts)
g.Add(func() error {
ruleMgrs[storepb.PartialResponseStrategy_WARN].Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
ruleMgrs[storepb.PartialResponseStrategy_WARN].Stop()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"type": "strict"}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, true)

ruleMgrs[storepb.PartialResponseStrategy_ABORT] = rules.NewManager(&opts)
g.Add(func() error {
ruleMgrs[storepb.PartialResponseStrategy_ABORT].Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
ruleMgrs[storepb.PartialResponseStrategy_ABORT].Stop()
})
}
}
{
var storeLset []storepb.Label
Expand Down Expand Up @@ -469,11 +460,13 @@ func runRule(
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err)
continue
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
if err := mgr.Update(evalInterval, files); err != nil {

if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
Expand All @@ -483,9 +476,12 @@ func runRule(
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

rulesLoaded.Reset()
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules())))
for s, mgr := range ruleMgrs {
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
}

}
}, func(error) {
close(cancel)
Expand Down Expand Up @@ -569,9 +565,9 @@ func runRule(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, mgr)
api := v1.NewAPI(logger, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

mux := http.NewServeMux()
Expand Down Expand Up @@ -767,3 +763,63 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.
}
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
logger log.Logger,
peer cluster.Peer,
dnsProvider *dns.Provider,
duplicatedQuery prometheus.Counter,
partialResponseDisabled bool,
) rules.QueryFunc {
const partialResponseParam = "partial_response"

partialResponseValue := strconv.FormatBool(!partialResponseDisabled)
spanID := "/rule_instant_query HTTP[client]"
if partialResponseDisabled {
spanID = "/rule_instant_query_strict HTTP[client]"
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}
form, err := url.ParseQuery(u.RawQuery)
if err != nil {
return nil, errors.Wrapf(err, "url raw query parse %s", addrs[i])
}
form.Add(partialResponseParam, partialResponseValue)
u.RawQuery = form.Encode()

span, ctx := tracing.StartSpan(ctx, spanID)
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}
}
Loading

0 comments on commit 3107832

Please sign in to comment.