diff --git a/CHANGELOG.md b/CHANGELOG.md index fa6032cdb7..03627e3283 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,12 @@ New tracing span: :warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query. +- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`. + +### Changed +- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API. +- [#970](https://github.com/improbable-eng/thanos/pull/970) No `PartialResponseStrategy` field for `RuleGroups` by default means `abort` strategy (old PartialResponse disabled) as this is recommended option for Rules and alerts. + ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far - [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s` diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 792ff7362c..fd360e60af 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -282,7 +282,7 @@ func runQuery( fileSDCache := cache.New() dnsProvider := dns.NewProvider( logger, - extprom.WrapRegistererWithPrefix("thanos_querier_store_apis", reg), + extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg), ) var ( diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e9b4a87a0e..e71d57976c 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -30,6 +30,7 @@ import ( "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/promclient" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" v1 "github.com/improbable-eng/thanos/pkg/rule/api" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" @@ -227,13 +228,23 @@ func runRule( Name: "thanos_rule_loaded_rules", Help: "Loaded rules partitioned by file and group", }, - []string{"file", "group"}, + []string{"part_resp_strategy", "file", "group"}, ) + ruleEvalWarnings := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_rule_evaluation_with_warnings_total", + Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.", + }, []string{"strategy"}, + ) + ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String())) + ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String())) + reg.MustRegister(configSuccess) reg.MustRegister(configSuccessTime) reg.MustRegister(duplicatedQuery) reg.MustRegister(alertMngrAddrResolutionErrors) reg.MustRegister(rulesLoaded) + reg.MustRegister(ruleEvalWarnings) for _, addr := range queryAddrs { if addr == "" { @@ -260,57 +271,16 @@ func runRule( dnsProvider := dns.NewProvider( logger, - extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg), + extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis_", reg), ) - // Hit the HTTP query API of query peers in randomized order until we get a result - // back or the context get canceled. - queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - var addrs []string - - // Add addresses from gossip. - peers := peer.PeerStates(cluster.PeerTypeQuery) - var ids []string - for id := range peers { - ids = append(ids, id) - } - sort.Slice(ids, func(i int, j int) bool { - return strings.Compare(ids[i], ids[j]) < 0 - }) - for _, id := range ids { - addrs = append(addrs, peers[id].QueryAPIAddr) - } - - // Add DNS resolved addresses from static flags and file SD. - // TODO(bwplotka): Consider generating addresses in *url.URL - addrs = append(addrs, dnsProvider.Addresses()...) - - removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) - - for _, i := range rand.Perm(len(addrs)) { - u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i])) - if err != nil { - return nil, errors.Wrapf(err, "url parse %s", addrs[i]) - } - - span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]") - v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true) - span.Finish() - return v, err - } - return nil, errors.Errorf("no query peer reachable") - } - // Run rule evaluation and alert notifications. var ( alertmgrs = newAlertmanagerSet(alertmgrURLs) alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - mgr *rules.Manager + ruleMgrs = thanosrule.Managers{} ) { - ctx, cancel := context.WithCancel(context.Background()) - ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { @@ -331,26 +301,38 @@ func runRule( } alertQ.Push(res) } - st := tsdb.Adapter(db, 0) - mgr = rules.NewManager(&rules.ManagerOptions{ - Context: ctx, - QueryFunc: queryFn, + + opts := rules.ManagerOptions{ NotifyFunc: notify, Logger: log.With(logger, "component", "rules"), Appendable: st, - Registerer: reg, ExternalURL: nil, TSDB: st, - }) - g.Add(func() error { - mgr.Run() - <-ctx.Done() - mgr.Stop() - return nil - }, func(error) { - cancel() - }) + } + + for _, strategy := range storepb.PartialResponseStrategy_value { + s := storepb.PartialResponseStrategy(strategy) + + ctx, cancel := context.WithCancel(context.Background()) + ctx = tracing.ContextWithTracer(ctx, tracer) + + opts := opts + opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) + opts.Context = ctx + opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s) + + ruleMgrs[s] = rules.NewManager(&opts) + g.Add(func() error { + ruleMgrs[s].Run() + <-ctx.Done() + + return nil + }, func(error) { + cancel() + ruleMgrs[s].Stop() + }) + } } { var storeLset []storepb.Label @@ -469,11 +451,13 @@ func runRule( level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err) continue } + files = append(files, fs...) } level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) - if err := mgr.Update(evalInterval, files); err != nil { + + if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil { configSuccess.Set(0) level.Error(logger).Log("msg", "reloading rules failed", "err", err) continue @@ -483,9 +467,12 @@ func runRule( configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9) rulesLoaded.Reset() - for _, group := range mgr.RuleGroups() { - rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules()))) + for s, mgr := range ruleMgrs { + for _, group := range mgr.RuleGroups() { + rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) + } } + } }, func(error) { close(cancel) @@ -569,9 +556,9 @@ func runRule( "web.prefix-header": webPrefixHeaderName, } - ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix)) + ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix)) - api := v1.NewAPI(logger, mgr) + api := v1.NewAPI(logger, ruleMgrs) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger) mux := http.NewServeMux() @@ -767,3 +754,75 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus. } return deduplicated } + +// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result +// back or the context get canceled. +func queryFunc( + logger log.Logger, + peer cluster.Peer, + dnsProvider *dns.Provider, + duplicatedQuery prometheus.Counter, + ruleEvalWarnings *prometheus.CounterVec, + partialResponseStrategy storepb.PartialResponseStrategy, +) rules.QueryFunc { + var spanID string + + switch partialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + spanID = "/rule_instant_query HTTP[client]" + case storepb.PartialResponseStrategy_ABORT: + spanID = "/rule_instant_query_part_resp_abort HTTP[client]" + default: + // Programming error will be caught by tests. + panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) + } + + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + var addrs []string + + // Add addresses from gossip. + peers := peer.PeerStates(cluster.PeerTypeQuery) + var ids []string + for id := range peers { + ids = append(ids, id) + } + sort.Slice(ids, func(i int, j int) bool { + return strings.Compare(ids[i], ids[j]) < 0 + }) + for _, id := range ids { + addrs = append(addrs, peers[id].QueryAPIAddr) + } + + // Add DNS resolved addresses from static flags and file SD. + // TODO(bwplotka): Consider generating addresses in *url.URL + addrs = append(addrs, dnsProvider.Addresses()...) + + removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) + + for _, i := range rand.Perm(len(addrs)) { + u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i])) + if err != nil { + return nil, errors.Wrapf(err, "url parse %s", addrs[i]) + } + + span, ctx := tracing.StartSpan(ctx, spanID) + v, warns, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, promclient.QueryOptions{ + Deduplicate: true, + PartialResponseStrategy: partialResponseStrategy, + }) + span.Finish() + + if err != nil { + level.Error(logger).Log("err", err, "query", q) + } + + if err == nil && len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + return v, err + } + return nil, errors.Errorf("no query peer reachable") + } +} diff --git a/docs/components/query.md b/docs/components/query.md index 972796eeca..cd38da1053 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -53,8 +53,33 @@ This logic can also be controlled via parameter on QueryAPI. More details below. Overall QueryAPI exposed by Thanos is guaranteed to be compatible with Prometheus 2.x. -However, for additional Thanos features, Thanos, on top of Prometheus adds several -additional parameters listed below as well as custom response fields. +However, for additional Thanos features, Thanos, on top of Prometheus adds +* partial response behaviour +* several additional parameters listed below +* custom response fields. + +### Partial Response + +QueryAPI and StoreAPI has additional behaviour controlled via query parameter called [PartialResponseStrategy](../../pkg/store/storepb/rpc.pb.go). + +This parameter controls tradeoff between accuracy and availability. + +Partial response is a potentially missed result within query against QueryAPI or StoreAPI. This can happen if one +of StoreAPIs is returning error or timeout whereas couple of others returns success. It does not mean you are missing data, +you might lucky enough that you actually get the correct data as the broken StoreAPI did not have anything for your query. + +If partial response happen QueryAPI returns human readable warnings explained [here](query.md#CustomResponseFields) + +NOTE that having warning does not necessary means partial response (e.g no store matched query warning) + +See [this](query.md#PartialResponseStrategy) on how to control this behaviour. + +Querier also allows to configure different timeouts: +* `--query.timeout` +* `--store.response-timeout` + +If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response +strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response. ### Deduplication Enabled @@ -77,7 +102,9 @@ Max source resolution is max resolution in seconds we want to use for data we qu * 5m -> we will use max 5m downsampling. * 1h -> we will use max 1h downsampling. -### Partial Response / Error Enabled +### Partial Response Strategy + +// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto) | HTTP URL/FORM parameter | Type | Default | Example | |----|----|----|----| @@ -92,6 +119,7 @@ return warning. Any additional field does not break compatibility, however there is no guarantee that Grafana or any other client will understand those. Currently Thanos UI exposed by Thanos understands + ```go type queryData struct { ResultType promql.ValueType `json:"resultType"` diff --git a/docs/components/rule.md b/docs/components/rule.md index 60b856223b..fb7304d757 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -1,36 +1,139 @@ -# Rule +# Rule (aka Ruler) -_**NOTE:** The rule component is experimental since it has conceptual tradeoffs that might not be favorable for most use cases. It is recommended to keep deploying rules to the relevant Prometheus servers._ +_**NOTE:** It is recommended to ma deploying rules inside the relevant Prometheus servers locally. Use ruler only on specific cases. Read details[below](rule.md#Risk) why._ _The rule component should in particular not be used to circumvent solving rule deployment properly at the configuration management level._ -The rule component evaluates Prometheus recording and alerting rules against random query nodes in its cluster. Rule results are written back to disk in the Prometheus 2.0 storage format. Rule nodes at the same time participate in the cluster themselves as source store nodes and upload their generated TSDB blocks to an object store. +The rule component evaluates Prometheus recording and alerting rules against chosen query API via repeated `--query` (or FileSD via `--query.sd`). If more then one query is passed, round robin balancing is performed. + +Rule results are written back to disk in the Prometheus 2.0 storage format. Rule nodes at the same time participate in the system as source store nodes, which means that they expose StoreAPI and upload their generated TSDB blocks to an object store. -The data of each rule node can be labeled to satisfy the clusters labeling scheme. High-availability pairs can be run in parallel and should be distinguished by the designated replica label, just like regular Prometheus servers. +You can think of Rule as a simplified Prometheus that does not require a sidecar and does not scrape and do PromQL evaluation (no QueryAPI). + +The data of each Rule node can be labeled to satisfy the clusters labeling scheme. High-availability pairs can be run in parallel and should be distinguished by the designated replica label, just like regular Prometheus servers. +Read more about Ruler in HA in [here](rule.md#Ruler_HA) ``` $ thanos rule \ - --data-dir "/path/to/data" \ - --eval-interval "30s" \ - --rule-file "/path/to/rules/*.rules.yaml" \ - --alert.query-url "http://0.0.0.0:9090" \ - --alertmanagers.url "alert.thanos.io" \ - --cluster.peers "thanos-cluster.example.org" \ - --objstore.config-file "bucket.yml" + --data-dir "/path/to/data" \ + --eval-interval "30s" \ + --rule-file "/path/to/rules/*.rules.yaml" \ + --alert.query-url "http://0.0.0.0:9090" \ # This tells what query URL to link to in UI. + --alertmanagers.url "alert.thanos.io" \ + --query "query.example.org" \ + --query "query2.example.org" \ + --objstore.config-file "bucket.yml" \ + --label 'monitor_cluster="cluster1"' + --label 'replica="A" ``` -The content of `bucket.yml`: +## Risk + +Ruler has conceptual tradeoffs that might not be favorable for most use cases. The main tradeoff is its dependence on +query reliability. For Prometheus it is unlikely to have alert/recording rule evaluation failure as evaluation is local. + +For Ruler the read path is distributed, since most likely ruler is querying Thanos Querier which gets data from remote Store APIs. + +This means that **query failure** are more likely to happen, that's why clear strategy on what will happen to alert and during query +unavailability is the key. + +## Partial Response + +See [this](query.md#PartialResponse) on initial info. + +Rule allows to specify rule groups with additional field that controls PartialResponseStrategy e.g: ```yaml -type: GCS -config: - bucket: example-bucket +groups: +- name: "warn strategy" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "abort strategy" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +- name: "by default strategy is abort" + rules: + - alert: "some" + expr: "up" ``` +It is recommended to keep partial response to `abort` for alerts and that is the default as well. + +Essentially for alerting having partial response can result in symptom being missed by Rule's alert. + +## Must have: essential Ruler alerts! + +To be sure that alerting works it is essential to monitor Ruler and alert from another **Scraper (Prometheus + sidecar)** that sits in same cluster. + +The most important metrics to alert on are: + +* `thanos_alert_sender_alerts_dropped_total`. If greater than 0 it means that rule triggered alerts are not being sent to alertmanager which might +indicate connection, incompatibility or misconfiguration problems. + +* `prometheus_rule_evaluation_failures_total`. If greater than 0 it means that rule failed to be evaluated which results in +either gap in rule or potentially ignored alert. Alert heavily on this if this happens for longer than your alert thresholds. +`strategy` label will tell you if failures comes from rules that tolerates [partial response](rule.md#PartialResponse) or not. + +* `prometheus_rule_group_last_duration_seconds < prometheus_rule_group_interval_seconds` If the difference is heavy it means +that rule evaluation took more time than scheduled interval. It can indicate your query backend (e.g Querier) takes too much time +to evaluate the query, that is not fast enough to fill the rule. This might indicate other problems like slow StoreAPis or +too complex query expression in rule. + +* `thanos_rule_evaluation_with_warnings_total`. If you choose to use Rules and Alerts with [partial response strategy](rule.md#PartialResponse) +equals "warn", this metric will tell you how many evaluation ended up with some kind of warning. To see the actual warnings +see WARN log level. This might suggest that those evaluations returns partial response and might be or not accurate. + +Those metrics are important for vanilla Prometheus as well, but even more important when we rely on (sometimes WAN) network. + +// TODO(bwplotka): Rereview them after recent changes in metrics. +See [alerts](/examples/alerts/alerts.md#Ruler) for more example alerts for ruler. + +NOTE: It is also recommend to set an mocked Alert on Ruler that checks if query is up. This might be something simple like `vector(1)` query, just +to check if Querier is live. + +## Performance. + As rule nodes outsource query processing to query nodes, they should generally experience little load. If necessary, functional sharding can be applied by splitting up the sets of rules between HA pairs. Rules are processed with deduplicated data according to the replica label configured on query nodes. -## Deployment +## External labels + +It is *mandatory* to add certain external labels to indicate the ruler origin (e.g `label='replica="A"'` or for `cluster`). +Otherwise running multiple ruler replicas will be not possible, resulting in clash during compaction. + +NOTE: It is advised to put different external labels than labels given by other sources we are recording or alerting against. + +For example: + +* Ruler is in cluster `mon1` and we have Prometheus in cluster `eu1` +* By default we could try having consistent labels so we have `cluster=eu1` for Prometheus and `cluster=mon1` for Ruler. +* We configure `ScraperIsDown` alert that monitors service from `work1` cluster. +* When triggered this alert results in `ScraperIsDown{cluster=mon1}` since external labels always *replace* source labels. + +This effectively drops the important metadata and makes it impossible to tell in what exactly `cluster` the `ScraperIsDown` alert found problem +without falling back to manual query. + +## Ruler UI + +On HTTP address ruler exposes its UI that shows mainly Alerts and Rules page (similar to Prometheus Alerts page). +Each alert is linked to query that alert is performing that you can click to navigate to configured `alert.query-url`. + +## Ruler HA + +Ruler aims to use similar approach as Prometheus does. You can configure external labels, as well as simple relabelling. + +In case of Ruler in HA you need to make sure you have following labelling setup: + +* Labels that identifies the HA group ruler and replica label with different value for each ruler instance, e.g: +`cluster="eu1", replica="A"` and `cluster=eu1, replica="B"` by using `--label` flag. +* Labels that needs to be dropped just before sending to alermanager in order for alertmanger to deduplicate alerts e.g +`--alertmanager.label-drop="replica"`. + +Full relabelling is planned to be done in future and is tracked here: https://github.com/improbable-eng/thanos/issues/660 ## Flags diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index e70fbb245b..1622a16340 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -3,9 +3,13 @@ package promclient import ( + "bufio" + "bytes" + "compress/gzip" "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -18,14 +22,17 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/tracing" "github.com/pkg/errors" "github.com/prometheus/common/model" promlabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/promql" "github.com/prometheus/tsdb/labels" - "gopkg.in/yaml.v2" + yaml "gopkg.in/yaml.v2" ) // IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell @@ -242,24 +249,55 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo return path.Join("snapshots", d.Data.Name), nil } +type QueryOptions struct { + Deduplicate bool + PartialResponseStrategy storepb.PartialResponseStrategy +} + +func (p *QueryOptions) AddTo(values url.Values) error { + values.Add("dedup", fmt.Sprintf("%v", p.Deduplicate)) + + var partialResponseValue string + switch p.PartialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + partialResponseValue = strconv.FormatBool(true) + case storepb.PartialResponseStrategy_ABORT: + partialResponseValue = strconv.FormatBool(false) + default: + return errors.Errorf("unknown partial response strategy %v", p.PartialResponseStrategy) + } + + // TODO(bwplotka): Apply change from bool to strategy in Query API as well. + values.Add("partial_response", partialResponseValue) + + return nil +} + // QueryInstant performs instant query and returns results in model.Vector type. -func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, dedup bool) (model.Vector, error) { +func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, opts QueryOptions) (model.Vector, []string, error) { if logger == nil { logger = log.NewNopLogger() } - u := *base - u.Path = path.Join(u.Path, "/api/v1/query") - - params := url.Values{} + params, err := url.ParseQuery(base.RawQuery) + if err != nil { + return nil, nil, errors.Wrapf(err, "parse raw query %s", base.RawQuery) + } params.Add("query", query) params.Add("time", t.Format(time.RFC3339Nano)) - params.Add("dedup", fmt.Sprintf("%v", dedup)) + if err := opts.AddTo(params); err != nil { + return nil, nil, errors.Wrap(err, "add thanos opts query params") + } + + u := *base + u.Path = path.Join(u.Path, "/api/v1/query") u.RawQuery = params.Encode() + level.Debug(logger).Log("msg", "querying instant", "url", u.String()) + req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, err + return nil, nil, errors.Wrap(err, "create GET request") } req = req.WithContext(ctx) @@ -269,7 +307,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s } resp, err := client.Do(req) if err != nil { - return nil, err + return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String()) } defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") @@ -280,10 +318,13 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s ResultType string `json:"resultType"` Result json.RawMessage `json:"result"` } `json:"data"` + + // Extra field supported by Thanos Querier. + Warnings []string `json:"warnings"` } if err = json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, err + return nil, nil, errors.Wrap(err, "decode query instant response") } var vectorResult model.Vector @@ -293,24 +334,24 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s switch m.Data.ResultType { case promql.ValueTypeVector: if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil { - return nil, err + return nil, nil, errors.Wrap(err, "decode result into ValueTypeVector") } case promql.ValueTypeScalar: vectorResult, err = convertScalarJSONToVector(m.Data.Result) if err != nil { - return nil, err + return nil, nil, errors.Wrap(err, "decode result into ValueTypeScalar") } default: - return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) + return nil, nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) } - return vectorResult, nil + return vectorResult, m.Warnings, nil } // PromqlQueryInstant performs instant query and returns results in promql.Vector type that is compatible with promql package. -func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, dedup bool) (promql.Vector, error) { - vectorResult, err := QueryInstant(ctx, logger, base, query, t, dedup) +func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, opts QueryOptions) (promql.Vector, []string, error) { + vectorResult, warnings, err := QueryInstant(ctx, logger, base, query, t, opts) if err != nil { - return nil, err + return nil, nil, err } vec := make(promql.Vector, 0, len(vectorResult)) @@ -332,7 +373,7 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q }) } - return vec, nil + return vec, warnings, nil } // Scalar response consists of array with mixed types so it needs to be @@ -362,3 +403,78 @@ func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, Value: resultValue, Timestamp: resultTime}}, nil } + +// MetricValues returns current value of instant query and returns results in model.Vector type. +func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetricFn func(metric promlabels.Labels, val float64) error) error { + if logger == nil { + logger = log.NewNopLogger() + } + + u := *base + u.Path = path.Join(u.Path, "/metrics") + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return errors.Wrap(err, "create GET request") + } + + req.Header.Add("Accept", `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`) + req.Header.Add("Accept-Encoding", "gzip") + + req = req.WithContext(ctx) + + client := &http.Client{ + Transport: tracing.HTTPTripperware(logger, http.DefaultTransport), + } + resp, err := client.Do(req) + if err != nil { + return errors.Wrapf(err, "perform GET request against %s", u.String()) + } + defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body") + + if resp.StatusCode != http.StatusOK { + return errors.Errorf("server returned HTTP status %s", resp.Status) + } + + b := &bytes.Buffer{} + if resp.Header.Get("Content-Encoding") != "gzip" { + _, err = io.Copy(b, resp.Body) + if err != nil { + return err + } + } else { + buf := bufio.NewReader(resp.Body) + gzipr, err := gzip.NewReader(buf) + if err != nil { + return err + } + _, err = io.Copy(b, gzipr) + _ = gzipr.Close() + if err != nil { + return err + } + } + + p := textparse.New(b.Bytes(), resp.Header.Get("Content-Type")) + for { + var et textparse.Entry + if et, err = p.Next(); err != nil { + if err == io.EOF { + return nil + } + return err + } + + if et != textparse.EntrySeries { + continue + } + + var lset promlabels.Labels + _ = p.Metric(&lset) + _, _, v := p.Series() + + if err := perMetricFn(lset, v); err != nil { + return err + } + } +} diff --git a/pkg/rule/api/v1.go b/pkg/rule/api/v1.go index be2dc5abf4..df15e1750f 100644 --- a/pkg/rule/api/v1.go +++ b/pkg/rule/api/v1.go @@ -6,31 +6,32 @@ import ( "time" "github.com/NYTimes/gziphandler" + "github.com/go-kit/kit/log" qapi "github.com/improbable-eng/thanos/pkg/query/api" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" + "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/tracing" - "github.com/prometheus/client_golang/prometheus" - - "github.com/go-kit/kit/log" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/rules" ) type API struct { - logger log.Logger - now func() time.Time - rulesRetriever rulesRetriever + logger log.Logger + now func() time.Time + ruleRetriever RulesRetriever } func NewAPI( logger log.Logger, - rr rulesRetriever, + ruleRetriever RulesRetriever, ) *API { return &API{ - logger: logger, - now: time.Now, - rulesRetriever: rr, + logger: logger, + now: time.Now, + ruleRetriever: ruleRetriever, } } @@ -54,20 +55,20 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. } -type rulesRetriever interface { - RuleGroups() []*rules.Group - AlertingRules() []*rules.AlertingRule +type RulesRetriever interface { + RuleGroups() []thanosrule.Group + AlertingRules() []thanosrule.AlertingRule } func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { - ruleGroups := api.rulesRetriever.RuleGroups() - res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))} - for i, grp := range ruleGroups { + res := &RuleDiscovery{} + for _, grp := range api.ruleRetriever.RuleGroups() { apiRuleGroup := &RuleGroup{ - Name: grp.Name(), - File: grp.File(), - Interval: grp.Interval().Seconds(), - Rules: []rule{}, + Name: grp.Name(), + File: grp.File(), + Interval: grp.Interval().Seconds(), + Rules: []rule{}, + PartialResponseStrategy: grp.PartialResponseStrategy.String(), } for _, r := range grp.Rules() { @@ -79,17 +80,18 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { } switch rule := r.(type) { - case *rules.AlertingRule: + case thanosrule.AlertingRule: enrichedRule = alertingRule{ - Name: rule.Name(), - Query: rule.Query().String(), - Duration: rule.Duration().Seconds(), - Labels: rule.Labels(), - Annotations: rule.Annotations(), - Alerts: rulesAlertsToAPIAlerts(rule.ActiveAlerts()), - Health: rule.Health(), - LastError: lastError, - Type: "alerting", + Name: rule.Name(), + Query: rule.Query().String(), + Duration: rule.Duration().Seconds(), + Labels: rule.Labels(), + Annotations: rule.Annotations(), + Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()), + Health: rule.Health(), + LastError: lastError, + Type: "alerting", + PartialResponseStrategy: rule.PartialResponseStrategy.String(), } case *rules.RecordingRule: enrichedRule = recordingRule{ @@ -107,22 +109,20 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) } - res.RuleGroups[i] = apiRuleGroup + res.RuleGroups = append(res.RuleGroups, apiRuleGroup) } + return res, nil, nil } func (api *API) alerts(r *http.Request) (interface{}, []error, *qapi.ApiError) { - alertingRules := api.rulesRetriever.AlertingRules() - alerts := []*Alert{} - - for _, alertingRule := range alertingRules { + var alerts []*Alert + for _, alertingRule := range api.ruleRetriever.AlertingRules() { alerts = append( alerts, - rulesAlertsToAPIAlerts(alertingRule.ActiveAlerts())..., + rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())..., ) } - res := &AlertDiscovery{Alerts: alerts} return res, nil, nil @@ -133,22 +133,24 @@ type AlertDiscovery struct { } type Alert struct { - Labels labels.Labels `json:"labels"` - Annotations labels.Labels `json:"annotations"` - State string `json:"state"` - ActiveAt *time.Time `json:"activeAt,omitempty"` - Value float64 `json:"value"` + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + State string `json:"state"` + ActiveAt *time.Time `json:"activeAt,omitempty"` + Value float64 `json:"value"` + PartialResponseStrategy string `json:"partial_response_strategy"` } -func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert { +func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*Alert { apiAlerts := make([]*Alert, len(rulesAlerts)) for i, ruleAlert := range rulesAlerts { apiAlerts[i] = &Alert{ - Labels: ruleAlert.Labels, - Annotations: ruleAlert.Annotations, - State: ruleAlert.State.String(), - ActiveAt: &ruleAlert.ActiveAt, - Value: ruleAlert.Value, + PartialResponseStrategy: s.String(), + Labels: ruleAlert.Labels, + Annotations: ruleAlert.Annotations, + State: ruleAlert.State.String(), + ActiveAt: &ruleAlert.ActiveAt, + Value: ruleAlert.Value, } } @@ -165,22 +167,24 @@ type RuleGroup struct { // In order to preserve rule ordering, while exposing type (alerting or recording) // specific properties, both alerting and recording rules are exposed in the // same array. - Rules []rule `json:"rules"` - Interval float64 `json:"interval"` + Rules []rule `json:"rules"` + Interval float64 `json:"interval"` + PartialResponseStrategy string `json:"partial_response_strategy"` } type rule interface{} type alertingRule struct { - Name string `json:"name"` - Query string `json:"query"` - Duration float64 `json:"duration"` - Labels labels.Labels `json:"labels"` - Annotations labels.Labels `json:"annotations"` - Alerts []*Alert `json:"alerts"` - Health rules.RuleHealth `json:"health"` - LastError string `json:"lastError,omitempty"` - Type string `json:"type"` + Name string `json:"name"` + Query string `json:"query"` + Duration float64 `json:"duration"` + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + Alerts []*Alert `json:"alerts"` + Health rules.RuleHealth `json:"health"` + LastError string `json:"lastError,omitempty"` + Type string `json:"type"` + PartialResponseStrategy string `json:"partial_response_strategy"` } type recordingRule struct { diff --git a/pkg/rule/api/v1_test.go b/pkg/rule/api/v1_test.go index af4034bd8d..f3e2c8d585 100644 --- a/pkg/rule/api/v1_test.go +++ b/pkg/rule/api/v1_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" qapi "github.com/improbable-eng/thanos/pkg/query/api" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -23,7 +24,7 @@ type rulesRetrieverMock struct { testing *testing.T } -func (m rulesRetrieverMock) RuleGroups() []*rules.Group { +func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group { var ar rulesRetrieverMock arules := ar.AlertingRules() storage := testutil.NewStorage(m.testing) @@ -59,10 +60,10 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { r = append(r, recordingRule) group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) - return []*rules.Group{group} + return []thanosrule.Group{{Group: group}} } -func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { +func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule { expr1, err := promql.ParseExpr(`absent(test_metric3) != 1`) if err != nil { m.testing.Fatalf("unable to parse alert expression: %s", err) @@ -90,9 +91,9 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { true, log.NewNopLogger(), ) - var r []*rules.AlertingRule - r = append(r, rule1) - r = append(r, rule2) + var r []thanosrule.AlertingRule + r = append(r, thanosrule.AlertingRule{AlertingRule: rule1}) + r = append(r, thanosrule.AlertingRule{AlertingRule: rule2}) return r } @@ -122,7 +123,10 @@ func TestEndpoints(t *testing.T) { algr.testing = t algr.AlertingRules() algr.RuleGroups() - api := NewAPI(nil, algr) + api := NewAPI( + nil, + algr, + ) testEndpoints(t, api) }) } @@ -142,29 +146,32 @@ func testEndpoints(t *testing.T, api *API) { response: &RuleDiscovery{ RuleGroups: []*RuleGroup{ { - Name: "grp", - File: "/path/to/file", - Interval: 1, + Name: "grp", + File: "/path/to/file", + Interval: 1, + PartialResponseStrategy: "WARN", Rules: []rule{ alertingRule{ - Name: "test_metric3", - Query: "absent(test_metric3) != 1", - Duration: 1, - Labels: labels.Labels{}, - Annotations: labels.Labels{}, - Alerts: []*Alert{}, - Health: "unknown", - Type: "alerting", + Name: "test_metric3", + Query: "absent(test_metric3) != 1", + Duration: 1, + Labels: labels.Labels{}, + Annotations: labels.Labels{}, + Alerts: []*Alert{}, + Health: "unknown", + Type: "alerting", + PartialResponseStrategy: "WARN", }, alertingRule{ - Name: "test_metric4", - Query: "up == 1", - Duration: 1, - Labels: labels.Labels{}, - Annotations: labels.Labels{}, - Alerts: []*Alert{}, - Health: "unknown", - Type: "alerting", + Name: "test_metric4", + Query: "up == 1", + Duration: 1, + Labels: labels.Labels{}, + Annotations: labels.Labels{}, + Alerts: []*Alert{}, + Health: "unknown", + Type: "alerting", + PartialResponseStrategy: "WARN", }, recordingRule{ Name: "recording-rule-1", diff --git a/pkg/rule/rule.go b/pkg/rule/rule.go new file mode 100644 index 0000000000..11d988a897 --- /dev/null +++ b/pkg/rule/rule.go @@ -0,0 +1,169 @@ +package thanosrule + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/tsdb" + yaml "gopkg.in/yaml.v2" +) + +const tmpRuleDir = ".tmp-rules" + +type Group struct { + *rules.Group + + PartialResponseStrategy storepb.PartialResponseStrategy +} + +type AlertingRule struct { + *rules.AlertingRule + + PartialResponseStrategy storepb.PartialResponseStrategy +} + +type RuleGroups struct { + Groups []RuleGroup `yaml:"groups"` +} + +type RuleGroup struct { + rulefmt.RuleGroup + + PartialResponseStrategy storepb.PartialResponseStrategy `yaml:"partial_response_strategy"` +} + +type Managers map[storepb.PartialResponseStrategy]*rules.Manager + +func (m Managers) RuleGroups() []Group { + var res []Group + for s, r := range m { + for _, group := range r.RuleGroups() { + res = append(res, Group{Group: group, PartialResponseStrategy: s}) + } + } + return res +} + +func (m Managers) AlertingRules() []AlertingRule { + var res []AlertingRule + for s, r := range m { + for _, r := range r.AlertingRules() { + res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s}) + } + } + return res +} + +func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { + rs := struct { + String string `yaml:"partial_response_strategy"` + }{} + + errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) + if err := unmarshal(&rs); err != nil { + return errors.Wrapf(err, errMsg) + } + + rg := rulefmt.RuleGroup{} + if err := unmarshal(&rg); err != nil { + return errors.Wrapf(err, errMsg) + } + + p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] + if !ok { + if rs.String != "" { + return errors.Errorf("%s. Got: %s", errMsg, rs.String) + } + + // NOTE: For Rule default is abort as this is recommended for alerting. + p = storepb.PartialResponseStrategy_value[storepb.PartialResponseStrategy_ABORT.String()] + } + + r.RuleGroup = rg + r.PartialResponseStrategy = storepb.PartialResponseStrategy(p) + return nil +} + +// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on +// special field in RuleGroup file. +func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []string) error { + var ( + errs tsdb.MultiError + filesMap = map[storepb.PartialResponseStrategy][]string{} + ) + + if err := os.RemoveAll(path.Join(dataDir, tmpRuleDir)); err != nil { + return errors.Wrapf(err, "rm %s", path.Join(dataDir, tmpRuleDir)) + } + if err := os.MkdirAll(path.Join(dataDir, tmpRuleDir), os.ModePerm); err != nil { + return errors.Wrapf(err, "mkdir %s", path.Join(dataDir, tmpRuleDir)) + } + + for _, fn := range files { + b, err := ioutil.ReadFile(fn) + if err != nil { + errs = append(errs, err) + continue + } + + var rg RuleGroups + if err := yaml.Unmarshal(b, &rg); err != nil { + errs = append(errs, err) + continue + } + + // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse + // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. + mapped := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} + for _, rg := range rg.Groups { + if _, ok := mapped[rg.PartialResponseStrategy]; !ok { + mapped[rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} + } + + mapped[rg.PartialResponseStrategy].Groups = append( + mapped[rg.PartialResponseStrategy].Groups, + rg.RuleGroup, + ) + } + + for s, rg := range mapped { + b, err := yaml.Marshal(rg) + if err != nil { + errs = append(errs, err) + continue + } + + newFn := path.Join(dataDir, tmpRuleDir, filepath.Base(fn)+"."+s.String()) + if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { + errs = append(errs, err) + continue + } + + filesMap[s] = append(filesMap[s], newFn) + } + + } + + for s, fs := range filesMap { + updater, ok := (*m)[s] + if !ok { + errs = append(errs, errors.Errorf("no updater found for %v", s)) + continue + } + if err := updater.Update(evalInterval, fs); err != nil { + errs = append(errs, err) + continue + } + } + + return errs.Err() +} diff --git a/pkg/rule/rule_test.go b/pkg/rule/rule_test.go new file mode 100644 index 0000000000..932a102d15 --- /dev/null +++ b/pkg/rule/rule_test.go @@ -0,0 +1,124 @@ +package thanosrule + +import ( + "io/ioutil" + "os" + "path" + "sort" + "strings" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/prometheus/rules" +) + +func TestUpdate(t *testing.T) { + dir, err := ioutil.TempDir("", "test_rule_rule_groups") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "no_strategy.yaml"), []byte(` +groups: +- name: "something1" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "abort.yaml"), []byte(` +groups: +- name: "something2" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "warn.yaml"), []byte(` +groups: +- name: "something3" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "wrong.yaml"), []byte(` +groups: +- name: "something4" + partial_response_strategy: "afafsdgsdgs" # Err 1 + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "combined.yaml"), []byte(` +groups: +- name: "something5" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "something6" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +- name: "something7" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "combined-wrong.yaml"), []byte(` +groups: +- name: "something8" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "something9" + partial_response_strategy: "adad" # Err 2 + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + + opts := rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + } + m := Managers{ + storepb.PartialResponseStrategy_ABORT: rules.NewManager(&opts), + storepb.PartialResponseStrategy_WARN: rules.NewManager(&opts), + } + + err = m.Update(dir, 10*time.Second, []string{ + path.Join(dir, "no_strategy.yaml"), + path.Join(dir, "abort.yaml"), + path.Join(dir, "warn.yaml"), + path.Join(dir, "wrong.yaml"), + path.Join(dir, "combined.yaml"), + path.Join(dir, "combined_wrong.yaml"), + }) + + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasPrefix(err.Error(), "2 errors: failed to unmarshal 'partial_response_strategy'"), err.Error()) + + g := m[storepb.PartialResponseStrategy_WARN].RuleGroups() + testutil.Equals(t, 2, len(g)) + + sort.Slice(g, func(i, j int) bool { + return g[i].Name() < g[j].Name() + }) + testutil.Equals(t, "something3", g[0].Name()) + testutil.Equals(t, "something5", g[1].Name()) + + g = m[storepb.PartialResponseStrategy_ABORT].RuleGroups() + testutil.Equals(t, 4, len(g)) + + sort.Slice(g, func(i, j int) bool { + return g[i].Name() < g[j].Name() + }) + testutil.Equals(t, "something1", g[0].Name()) + testutil.Equals(t, "something2", g[1].Name()) + testutil.Equals(t, "something6", g[2].Name()) + testutil.Equals(t, "something7", g[3].Name()) +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fb295d167a..d2070229f1 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -249,7 +249,7 @@ func NewBucketStore( blockSyncConcurrency: blockSyncConcurrency, queryGate: NewGate( maxConcurrent, - extprom.WrapRegistererWithPrefix("thanos_bucket_store_series", reg), + extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, diff --git a/pkg/store/prompb/remote.pb.go b/pkg/store/prompb/remote.pb.go index 06a0952c98..8da7ac6b6d 100644 --- a/pkg/store/prompb/remote.pb.go +++ b/pkg/store/prompb/remote.pb.go @@ -53,7 +53,7 @@ func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { } type WriteRequest struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -93,7 +93,7 @@ func (m *WriteRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WriteRequest proto.InternalMessageInfo type ReadRequest struct { - Queries []Query `protobuf:"bytes,1,rep,name=queries" json:"queries"` + Queries []Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -134,7 +134,7 @@ var xxx_messageInfo_ReadRequest proto.InternalMessageInfo type ReadResponse struct { // In same order as the request's queries. - Results []QueryResult `protobuf:"bytes,1,rep,name=results" json:"results"` + Results []QueryResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -176,8 +176,8 @@ var xxx_messageInfo_ReadResponse proto.InternalMessageInfo type Query struct { StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` - Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` - Hints *ReadHints `protobuf:"bytes,4,opt,name=hints" json:"hints,omitempty"` + Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers,proto3" json:"matchers"` + Hints *ReadHints `protobuf:"bytes,4,opt,name=hints,proto3" json:"hints,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -217,7 +217,7 @@ func (m *Query) XXX_DiscardUnknown() { var xxx_messageInfo_Query proto.InternalMessageInfo type QueryResult struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -298,8 +298,8 @@ func (m *Sample) XXX_DiscardUnknown() { var xxx_messageInfo_Sample proto.InternalMessageInfo type TimeSeries struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples" json:"samples"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -863,6 +863,9 @@ func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *WriteRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Timeseries) > 0 { @@ -878,6 +881,9 @@ func (m *WriteRequest) Size() (n int) { } func (m *ReadRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Queries) > 0 { @@ -893,6 +899,9 @@ func (m *ReadRequest) Size() (n int) { } func (m *ReadResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Results) > 0 { @@ -908,6 +917,9 @@ func (m *ReadResponse) Size() (n int) { } func (m *Query) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.StartTimestampMs != 0 { @@ -933,6 +945,9 @@ func (m *Query) Size() (n int) { } func (m *QueryResult) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Timeseries) > 0 { @@ -948,6 +963,9 @@ func (m *QueryResult) Size() (n int) { } func (m *Sample) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Value != 0 { @@ -963,6 +981,9 @@ func (m *Sample) Size() (n int) { } func (m *TimeSeries) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -984,6 +1005,9 @@ func (m *TimeSeries) Size() (n int) { } func (m *Label) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Name) @@ -1001,6 +1025,9 @@ func (m *Label) Size() (n int) { } func (m *LabelMatcher) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { @@ -1021,6 +1048,9 @@ func (m *LabelMatcher) Size() (n int) { } func (m *ReadHints) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.StepMs != 0 { diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 18e2035e69..2097282408 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -6,6 +6,14 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) +var PartialResponseStrategyValues = func() []string { + var s []string + for k := range PartialResponseStrategy_value { + s = append(s, k) + } + return s +}() + func NewWarnSeriesResponse(err error) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Warning{ diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index e9ddda8711..a176361ddb 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -8,8 +8,10 @@ import fmt "fmt" import math "math" import _ "github.com/gogo/protobuf/gogoproto" -import context "golang.org/x/net/context" -import grpc "google.golang.org/grpc" +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) import io "io" @@ -56,7 +58,30 @@ func (x StoreType) String() string { return proto.EnumName(StoreType_name, int32(x)) } func (StoreType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} + return fileDescriptor_rpc_f4f04914f1106c76, []int{0} +} + +type PartialResponseStrategy int32 + +const ( + PartialResponseStrategy_WARN PartialResponseStrategy = 0 + PartialResponseStrategy_ABORT PartialResponseStrategy = 1 +) + +var PartialResponseStrategy_name = map[int32]string{ + 0: "WARN", + 1: "ABORT", +} +var PartialResponseStrategy_value = map[string]int32{ + "WARN": 0, + "ABORT": 1, +} + +func (x PartialResponseStrategy) String() string { + return proto.EnumName(PartialResponseStrategy_name, int32(x)) +} +func (PartialResponseStrategy) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_rpc_f4f04914f1106c76, []int{1} } type Aggr int32 @@ -91,7 +116,7 @@ func (x Aggr) String() string { return proto.EnumName(Aggr_name, int32(x)) } func (Aggr) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} + return fileDescriptor_rpc_f4f04914f1106c76, []int{2} } type InfoRequest struct { @@ -104,7 +129,7 @@ func (m *InfoRequest) Reset() { *m = InfoRequest{} } func (m *InfoRequest) String() string { return proto.CompactTextString(m) } func (*InfoRequest) ProtoMessage() {} func (*InfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} + return fileDescriptor_rpc_f4f04914f1106c76, []int{0} } func (m *InfoRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -134,7 +159,7 @@ func (m *InfoRequest) XXX_DiscardUnknown() { var xxx_messageInfo_InfoRequest proto.InternalMessageInfo type InfoResponse struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` MinTime int64 `protobuf:"varint,2,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,3,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` StoreType StoreType `protobuf:"varint,4,opt,name=storeType,proto3,enum=thanos.StoreType" json:"storeType,omitempty"` @@ -147,7 +172,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} } func (m *InfoResponse) String() string { return proto.CompactTextString(m) } func (*InfoResponse) ProtoMessage() {} func (*InfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} + return fileDescriptor_rpc_f4f04914f1106c76, []int{1} } func (m *InfoResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -177,22 +202,24 @@ func (m *InfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_InfoResponse proto.InternalMessageInfo type SeriesRequest struct { - MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` - MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` - Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` - MaxResolutionWindow int64 `protobuf:"varint,4,opt,name=max_resolution_window,json=maxResolutionWindow,proto3" json:"max_resolution_window,omitempty"` - Aggregates []Aggr `protobuf:"varint,5,rep,packed,name=aggregates,enum=thanos.Aggr" json:"aggregates,omitempty"` - PartialResponseDisabled bool `protobuf:"varint,6,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` + MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` + Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers,proto3" json:"matchers"` + MaxResolutionWindow int64 `protobuf:"varint,4,opt,name=max_resolution_window,json=maxResolutionWindow,proto3" json:"max_resolution_window,omitempty"` + Aggregates []Aggr `protobuf:"varint,5,rep,packed,name=aggregates,proto3,enum=thanos.Aggr" json:"aggregates,omitempty"` + // Deprecated. Use partial_response_strategy instead. + PartialResponseDisabled bool `protobuf:"varint,6,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` + PartialResponseStrategy PartialResponseStrategy `protobuf:"varint,7,opt,name=partial_response_strategy,json=partialResponseStrategy,proto3,enum=thanos.PartialResponseStrategy" json:"partial_response_strategy,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } func (m *SeriesRequest) String() string { return proto.CompactTextString(m) } func (*SeriesRequest) ProtoMessage() {} func (*SeriesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{2} + return fileDescriptor_rpc_f4f04914f1106c76, []int{2} } func (m *SeriesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -235,7 +262,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{3} + return fileDescriptor_rpc_f4f04914f1106c76, []int{3} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -271,7 +298,7 @@ type isSeriesResponse_Result interface { } type SeriesResponse_Series struct { - Series *Series `protobuf:"bytes,1,opt,name=series,oneof"` + Series *Series `protobuf:"bytes,1,opt,name=series,proto3,oneof"` } type SeriesResponse_Warning struct { Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof"` @@ -382,7 +409,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{4} + return fileDescriptor_rpc_f4f04914f1106c76, []int{4} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -412,8 +439,8 @@ func (m *LabelNamesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_LabelNamesRequest proto.InternalMessageInfo type LabelNamesResponse struct { - Names []string `protobuf:"bytes,1,rep,name=names" json:"names,omitempty"` - Warnings []string `protobuf:"bytes,2,rep,name=warnings" json:"warnings,omitempty"` + Names []string `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"` + Warnings []string `protobuf:"bytes,2,rep,name=warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -423,7 +450,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{5} + return fileDescriptor_rpc_f4f04914f1106c76, []int{5} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -464,7 +491,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{6} + return fileDescriptor_rpc_f4f04914f1106c76, []int{6} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -494,8 +521,8 @@ func (m *LabelValuesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_LabelValuesRequest proto.InternalMessageInfo type LabelValuesResponse struct { - Values []string `protobuf:"bytes,1,rep,name=values" json:"values,omitempty"` - Warnings []string `protobuf:"bytes,2,rep,name=warnings" json:"warnings,omitempty"` + Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` + Warnings []string `protobuf:"bytes,2,rep,name=warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -505,7 +532,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{7} + return fileDescriptor_rpc_f4f04914f1106c76, []int{7} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -544,6 +571,7 @@ func init() { proto.RegisterType((*LabelValuesRequest)(nil), "thanos.LabelValuesRequest") proto.RegisterType((*LabelValuesResponse)(nil), "thanos.LabelValuesResponse") proto.RegisterEnum("thanos.StoreType", StoreType_name, StoreType_value) + proto.RegisterEnum("thanos.PartialResponseStrategy", PartialResponseStrategy_name, PartialResponseStrategy_value) proto.RegisterEnum("thanos.Aggr", Aggr_name, Aggr_value) } @@ -555,8 +583,9 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// Client API for Store service - +// StoreClient is the client API for Store service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type StoreClient interface { // / Info returns meta information about a store e.g labels that makes that store unique as well as time range that is // / available. @@ -637,8 +666,7 @@ func (c *storeClient) LabelValues(ctx context.Context, in *LabelValuesRequest, o return out, nil } -// Server API for Store service - +// StoreServer is the server API for Store service. type StoreServer interface { // / Info returns meta information about a store e.g labels that makes that store unique as well as time range that is // / available. @@ -896,6 +924,11 @@ func (m *SeriesRequest) MarshalTo(dAtA []byte) (int, error) { } i++ } + if m.PartialResponseStrategy != 0 { + dAtA[i] = 0x38 + i++ + i = encodeVarintRpc(dAtA, i, uint64(m.PartialResponseStrategy)) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1132,6 +1165,9 @@ func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *InfoRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.XXX_unrecognized != nil { @@ -1141,6 +1177,9 @@ func (m *InfoRequest) Size() (n int) { } func (m *InfoResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -1165,6 +1204,9 @@ func (m *InfoResponse) Size() (n int) { } func (m *SeriesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.MinTime != 0 { @@ -1192,6 +1234,9 @@ func (m *SeriesRequest) Size() (n int) { if m.PartialResponseDisabled { n += 2 } + if m.PartialResponseStrategy != 0 { + n += 1 + sovRpc(uint64(m.PartialResponseStrategy)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1199,6 +1244,9 @@ func (m *SeriesRequest) Size() (n int) { } func (m *SeriesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Result != nil { @@ -1211,6 +1259,9 @@ func (m *SeriesResponse) Size() (n int) { } func (m *SeriesResponse_Series) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Series != nil { @@ -1220,6 +1271,9 @@ func (m *SeriesResponse_Series) Size() (n int) { return n } func (m *SeriesResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Warning) @@ -1227,6 +1281,9 @@ func (m *SeriesResponse_Warning) Size() (n int) { return n } func (m *LabelNamesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.PartialResponseDisabled { @@ -1239,6 +1296,9 @@ func (m *LabelNamesRequest) Size() (n int) { } func (m *LabelNamesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Names) > 0 { @@ -1260,6 +1320,9 @@ func (m *LabelNamesResponse) Size() (n int) { } func (m *LabelValuesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Label) @@ -1276,6 +1339,9 @@ func (m *LabelValuesRequest) Size() (n int) { } func (m *LabelValuesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Values) > 0 { @@ -1657,6 +1723,10 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } + var elementCount int + if elementCount != 0 && len(m.Aggregates) == 0 { + m.Aggregates = make([]Aggr, 0, elementCount) + } for iNdEx < postIndex { var v Aggr for shift := uint(0); ; shift += 7 { @@ -1698,6 +1768,25 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { } } m.PartialResponseDisabled = bool(v != 0) + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PartialResponseStrategy", wireType) + } + m.PartialResponseStrategy = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PartialResponseStrategy |= (PartialResponseStrategy(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2326,51 +2415,54 @@ var ( ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_b2f04ff11750c7dd) } - -var fileDescriptor_rpc_b2f04ff11750c7dd = []byte{ - // 683 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xc1, 0x6e, 0xda, 0x4c, - 0x10, 0xc6, 0x36, 0x36, 0x78, 0x48, 0x90, 0xb3, 0x21, 0xf9, 0x8d, 0x7f, 0x89, 0x22, 0x4e, 0x28, - 0xad, 0x92, 0x96, 0x4a, 0x95, 0xda, 0x1b, 0x10, 0x47, 0x41, 0x4d, 0x40, 0x5d, 0x20, 0x69, 0x7b, - 0x49, 0x4d, 0xb2, 0x71, 0x2c, 0x19, 0xdb, 0xf5, 0x9a, 0x26, 0xb9, 0xf6, 0x35, 0x7a, 0xeb, 0xd3, - 0xe4, 0xd8, 0x27, 0xa8, 0x5a, 0x9e, 0xa4, 0xf2, 0x7a, 0x0d, 0xb8, 0x4a, 0xb9, 0xed, 0x7c, 0xdf, - 0x78, 0xe6, 0xdb, 0x99, 0xcf, 0x0b, 0x6a, 0x18, 0x5c, 0xee, 0x07, 0xa1, 0x1f, 0xf9, 0x48, 0x89, - 0x6e, 0x2c, 0xcf, 0xa7, 0x46, 0x29, 0xba, 0x0f, 0x08, 0x4d, 0x40, 0xa3, 0x62, 0xfb, 0xb6, 0xcf, - 0x8e, 0x07, 0xf1, 0x29, 0x41, 0x1b, 0x9b, 0x50, 0xea, 0x79, 0xd7, 0x3e, 0x26, 0x9f, 0x67, 0x84, - 0x46, 0x8d, 0xef, 0x02, 0x6c, 0x24, 0x31, 0x0d, 0x7c, 0x8f, 0x12, 0xf4, 0x14, 0x14, 0xd7, 0x9a, - 0x10, 0x97, 0xea, 0x42, 0x5d, 0x6a, 0x96, 0x5a, 0x9b, 0xfb, 0x49, 0xed, 0xfd, 0x93, 0x18, 0xed, - 0xe4, 0x1f, 0x7e, 0x3e, 0xc9, 0x61, 0x9e, 0x82, 0xaa, 0x50, 0x9c, 0x3a, 0xde, 0x45, 0xe4, 0x4c, - 0x89, 0x2e, 0xd6, 0x85, 0xa6, 0x84, 0x0b, 0x53, 0xc7, 0x1b, 0x39, 0x53, 0xc2, 0x28, 0xeb, 0x2e, - 0xa1, 0x24, 0x4e, 0x59, 0x77, 0x8c, 0x3a, 0x00, 0x95, 0x46, 0x7e, 0x48, 0x46, 0xf7, 0x01, 0xd1, - 0xf3, 0x75, 0xa1, 0x59, 0x6e, 0x6d, 0xa5, 0x5d, 0x86, 0x29, 0x81, 0x97, 0x39, 0x8d, 0x6f, 0x22, - 0x6c, 0x0e, 0x49, 0xe8, 0x10, 0xca, 0x65, 0x67, 0x1a, 0x0b, 0xff, 0x6e, 0x2c, 0x66, 0x1b, 0xbf, - 0x8a, 0xa9, 0xe8, 0xf2, 0x86, 0x84, 0x54, 0x97, 0xd8, 0xed, 0x2a, 0x99, 0xdb, 0x9d, 0x26, 0x24, - 0xbf, 0xe4, 0x22, 0x17, 0xb5, 0x60, 0x27, 0x2e, 0x19, 0x12, 0xea, 0xbb, 0xb3, 0xc8, 0xf1, 0xbd, - 0x8b, 0x5b, 0xc7, 0xbb, 0xf2, 0x6f, 0x99, 0x78, 0x09, 0x6f, 0x4f, 0xad, 0x3b, 0xbc, 0xe0, 0xce, - 0x19, 0x85, 0x9e, 0x01, 0x58, 0xb6, 0x1d, 0x12, 0xdb, 0x8a, 0x08, 0xd5, 0xe5, 0xba, 0xd4, 0x2c, - 0xb7, 0x36, 0xd2, 0x6e, 0x6d, 0xdb, 0x0e, 0xf1, 0x0a, 0x8f, 0xde, 0x40, 0x35, 0xb0, 0xc2, 0xc8, - 0xb1, 0xdc, 0xb8, 0x0b, 0xdb, 0xc4, 0xc5, 0x95, 0x43, 0xad, 0x89, 0x4b, 0xae, 0x74, 0xa5, 0x2e, - 0x34, 0x8b, 0xf8, 0x3f, 0x9e, 0x90, 0x6e, 0xea, 0x90, 0xd3, 0x8d, 0x4f, 0x50, 0x4e, 0x87, 0xc3, - 0x77, 0xd8, 0x04, 0x85, 0x32, 0x84, 0xcd, 0xa6, 0xd4, 0x2a, 0x2f, 0xa6, 0xcb, 0xd0, 0xe3, 0x1c, - 0xe6, 0x3c, 0x32, 0xa0, 0x70, 0x6b, 0x85, 0x9e, 0xe3, 0xd9, 0x6c, 0x56, 0xea, 0x71, 0x0e, 0xa7, - 0x40, 0xa7, 0x08, 0x4a, 0x48, 0xe8, 0xcc, 0x8d, 0x1a, 0x03, 0xd8, 0x62, 0xf3, 0xe9, 0x5b, 0xd3, - 0xe5, 0x0a, 0xd6, 0x4a, 0x16, 0xd6, 0x4b, 0x3e, 0x02, 0xb4, 0x5a, 0x90, 0xcb, 0xae, 0x80, 0xec, - 0xc5, 0x00, 0x73, 0x9e, 0x8a, 0x93, 0x00, 0x19, 0x50, 0xe4, 0x8a, 0xa8, 0x2e, 0x32, 0x62, 0x11, - 0x37, 0xae, 0x79, 0x9d, 0x33, 0xcb, 0x9d, 0x2d, 0x95, 0x55, 0x40, 0x66, 0xfe, 0x64, 0x2a, 0x54, - 0x9c, 0x04, 0xeb, 0xf5, 0x8a, 0xeb, 0xf5, 0xf6, 0x60, 0x3b, 0xd3, 0x87, 0x0b, 0xde, 0x05, 0xe5, - 0x0b, 0x43, 0xb8, 0x62, 0x1e, 0xad, 0x93, 0xbc, 0x87, 0x41, 0x5d, 0x78, 0x1c, 0x95, 0xa0, 0x30, - 0xee, 0xbf, 0xed, 0x0f, 0xce, 0xfb, 0x5a, 0x0e, 0xa9, 0x20, 0xbf, 0x1b, 0x9b, 0xf8, 0x83, 0x26, - 0xa0, 0x22, 0xe4, 0xf1, 0xf8, 0xc4, 0xd4, 0xc4, 0x38, 0x63, 0xd8, 0x3b, 0x34, 0xbb, 0x6d, 0xac, - 0x49, 0x71, 0xc6, 0x70, 0x34, 0xc0, 0xa6, 0x96, 0x8f, 0x71, 0x6c, 0x76, 0xcd, 0xde, 0x99, 0xa9, - 0xc9, 0x7b, 0x1d, 0xc8, 0xc7, 0x8e, 0x42, 0x05, 0x90, 0x70, 0xfb, 0x3c, 0x29, 0xd5, 0x1d, 0x8c, - 0xfb, 0x23, 0x4d, 0x88, 0xb1, 0xe1, 0xf8, 0x54, 0x13, 0xe3, 0xc3, 0x69, 0xaf, 0xaf, 0x49, 0xec, - 0xd0, 0x7e, 0x9f, 0xd4, 0x60, 0x59, 0x26, 0xd6, 0xe4, 0xd6, 0x57, 0x11, 0x64, 0x26, 0x0c, 0xbd, - 0x80, 0x7c, 0xfc, 0x22, 0xa0, 0xed, 0xd4, 0x35, 0x2b, 0xef, 0x85, 0x51, 0xc9, 0x82, 0x7c, 0x10, - 0xaf, 0x41, 0x49, 0xac, 0x85, 0x76, 0xb2, 0x56, 0x4b, 0x3f, 0xdb, 0xfd, 0x1b, 0x4e, 0x3e, 0x7c, - 0x2e, 0xa0, 0x2e, 0xc0, 0xd2, 0x0a, 0xa8, 0x9a, 0xf9, 0x1f, 0x57, 0xfd, 0x66, 0x18, 0x8f, 0x51, - 0xbc, 0xff, 0x11, 0x94, 0x56, 0xf6, 0x83, 0xb2, 0xa9, 0x19, 0x73, 0x18, 0xff, 0x3f, 0xca, 0x25, - 0x75, 0x3a, 0xd5, 0x87, 0xdf, 0xb5, 0xdc, 0xc3, 0xbc, 0x26, 0xfc, 0x98, 0xd7, 0x84, 0x5f, 0xf3, - 0x9a, 0xf0, 0xb1, 0xc0, 0x5e, 0xa1, 0x60, 0x32, 0x51, 0xd8, 0xf3, 0xf9, 0xf2, 0x4f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x33, 0x33, 0x9b, 0x3a, 0x76, 0x05, 0x00, 0x00, +func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_f4f04914f1106c76) } + +var fileDescriptor_rpc_f4f04914f1106c76 = []byte{ + // 729 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xcf, 0x6e, 0xda, 0x4e, + 0x10, 0xc6, 0x36, 0x18, 0x18, 0x12, 0xe4, 0x6c, 0x48, 0x62, 0xfc, 0x93, 0x08, 0xe2, 0x84, 0xf2, + 0xab, 0x48, 0x4b, 0xa5, 0x4a, 0xed, 0x0d, 0x88, 0xa3, 0xa0, 0x26, 0xd0, 0x2e, 0x10, 0xfa, 0xe7, + 0x90, 0x9a, 0x64, 0xe3, 0x58, 0x02, 0x9b, 0x7a, 0x4d, 0x93, 0x5c, 0xfb, 0x28, 0x7d, 0x9a, 0x1c, + 0xfb, 0x04, 0x55, 0x9b, 0xa7, 0xe8, 0xb1, 0xda, 0xf5, 0x1a, 0x70, 0x9b, 0x70, 0xdb, 0xfd, 0xbe, + 0xf1, 0xcc, 0xb7, 0x33, 0x9f, 0x07, 0xb2, 0xfe, 0xf4, 0xbc, 0x36, 0xf5, 0xbd, 0xc0, 0x43, 0x6a, + 0x70, 0x65, 0xb9, 0x1e, 0x35, 0x72, 0xc1, 0xed, 0x94, 0xd0, 0x10, 0x34, 0x0a, 0xb6, 0x67, 0x7b, + 0xfc, 0xb8, 0xcf, 0x4e, 0x21, 0x5a, 0x59, 0x87, 0x5c, 0xdb, 0xbd, 0xf4, 0x30, 0xf9, 0x3c, 0x23, + 0x34, 0xa8, 0x7c, 0x93, 0x60, 0x2d, 0xbc, 0xd3, 0xa9, 0xe7, 0x52, 0x82, 0xfe, 0x07, 0x75, 0x6c, + 0x8d, 0xc8, 0x98, 0xea, 0x52, 0x59, 0xa9, 0xe6, 0xea, 0xeb, 0xb5, 0x30, 0x77, 0xed, 0x98, 0xa1, + 0xcd, 0xe4, 0xdd, 0x8f, 0xdd, 0x04, 0x16, 0x21, 0xa8, 0x08, 0x99, 0x89, 0xe3, 0x9e, 0x05, 0xce, + 0x84, 0xe8, 0x72, 0x59, 0xaa, 0x2a, 0x38, 0x3d, 0x71, 0xdc, 0xbe, 0x33, 0x21, 0x9c, 0xb2, 0x6e, + 0x42, 0x4a, 0x11, 0x94, 0x75, 0xc3, 0xa9, 0x7d, 0xc8, 0xd2, 0xc0, 0xf3, 0x49, 0xff, 0x76, 0x4a, + 0xf4, 0x64, 0x59, 0xaa, 0xe6, 0xeb, 0x1b, 0x51, 0x95, 0x5e, 0x44, 0xe0, 0x45, 0x4c, 0xe5, 0xb7, + 0x0c, 0xeb, 0x3d, 0xe2, 0x3b, 0x84, 0x0a, 0xd9, 0xb1, 0xc2, 0xd2, 0xe3, 0x85, 0xe5, 0x78, 0xe1, + 0x17, 0x8c, 0x0a, 0xce, 0xaf, 0x88, 0x4f, 0x75, 0x85, 0xbf, 0xae, 0x10, 0x7b, 0xdd, 0x49, 0x48, + 0x8a, 0x47, 0xce, 0x63, 0x51, 0x1d, 0xb6, 0x58, 0x4a, 0x9f, 0x50, 0x6f, 0x3c, 0x0b, 0x1c, 0xcf, + 0x3d, 0xbb, 0x76, 0xdc, 0x0b, 0xef, 0x9a, 0x8b, 0x57, 0xf0, 0xe6, 0xc4, 0xba, 0xc1, 0x73, 0x6e, + 0xc8, 0x29, 0xf4, 0x04, 0xc0, 0xb2, 0x6d, 0x9f, 0xd8, 0x56, 0x40, 0xa8, 0x9e, 0x2a, 0x2b, 0xd5, + 0x7c, 0x7d, 0x2d, 0xaa, 0xd6, 0xb0, 0x6d, 0x1f, 0x2f, 0xf1, 0xe8, 0x15, 0x14, 0xa7, 0x96, 0x1f, + 0x38, 0xd6, 0x98, 0x55, 0xe1, 0x93, 0x38, 0xbb, 0x70, 0xa8, 0x35, 0x1a, 0x93, 0x0b, 0x5d, 0x2d, + 0x4b, 0xd5, 0x0c, 0xde, 0x11, 0x01, 0xd1, 0xa4, 0x0e, 0x04, 0x8d, 0x3e, 0x3e, 0xf0, 0x2d, 0x0d, + 0x7c, 0x2b, 0x20, 0xf6, 0xad, 0x9e, 0xe6, 0xed, 0xdd, 0x8d, 0x0a, 0xbf, 0x89, 0xe7, 0xe8, 0x89, + 0xb0, 0x7f, 0x92, 0x47, 0x44, 0xe5, 0x13, 0xe4, 0xa3, 0xce, 0x0b, 0x83, 0x54, 0x41, 0xa5, 0x1c, + 0xe1, 0x8d, 0xcf, 0xd5, 0xf3, 0xf3, 0xd1, 0x71, 0xf4, 0x28, 0x81, 0x05, 0x8f, 0x0c, 0x48, 0x5f, + 0x5b, 0xbe, 0xeb, 0xb8, 0x36, 0x1f, 0x44, 0xf6, 0x28, 0x81, 0x23, 0xa0, 0x99, 0x01, 0xd5, 0x27, + 0x74, 0x36, 0x0e, 0x2a, 0x5d, 0xd8, 0xe0, 0xcd, 0xef, 0x58, 0x93, 0xc5, 0x7c, 0x57, 0xf6, 0x43, + 0x5a, 0xd9, 0x8f, 0xca, 0x21, 0xa0, 0xe5, 0x84, 0x42, 0x76, 0x01, 0x52, 0x2e, 0x03, 0xb8, 0xad, + 0xb3, 0x38, 0xbc, 0x20, 0x03, 0x32, 0x42, 0x11, 0xd5, 0x65, 0x4e, 0xcc, 0xef, 0x95, 0x4b, 0x91, + 0xe7, 0xd4, 0x1a, 0xcf, 0x16, 0xca, 0x0a, 0x90, 0xe2, 0xe6, 0xe7, 0x2a, 0xb2, 0x38, 0xbc, 0xac, + 0xd6, 0x2b, 0xaf, 0xd6, 0xdb, 0x86, 0xcd, 0x58, 0x1d, 0x21, 0x78, 0x1b, 0xd4, 0x2f, 0x1c, 0x11, + 0x8a, 0xc5, 0x6d, 0x95, 0xe4, 0x3d, 0x0c, 0xd9, 0xf9, 0x0f, 0x84, 0x72, 0x90, 0x1e, 0x74, 0x5e, + 0x77, 0xba, 0xc3, 0x8e, 0x96, 0x40, 0x59, 0x48, 0xbd, 0x1d, 0x98, 0xf8, 0xbd, 0x26, 0xa1, 0x0c, + 0x24, 0xf1, 0xe0, 0xd8, 0xd4, 0x64, 0x16, 0xd1, 0x6b, 0x1f, 0x98, 0xad, 0x06, 0xd6, 0x14, 0x16, + 0xd1, 0xeb, 0x77, 0xb1, 0xa9, 0x25, 0x19, 0x8e, 0xcd, 0x96, 0xd9, 0x3e, 0x35, 0xb5, 0xd4, 0x5e, + 0x0d, 0x76, 0x1e, 0x71, 0x0d, 0xcb, 0x34, 0x6c, 0x60, 0x91, 0xbe, 0xd1, 0xec, 0xe2, 0xbe, 0x26, + 0xed, 0x35, 0x21, 0xc9, 0xec, 0x8d, 0xd2, 0xa0, 0xe0, 0xc6, 0x30, 0xe4, 0x5a, 0xdd, 0x41, 0xa7, + 0xaf, 0x49, 0x0c, 0xeb, 0x0d, 0x4e, 0x34, 0x99, 0x1d, 0x4e, 0xda, 0x1d, 0x4d, 0xe1, 0x87, 0xc6, + 0xbb, 0xb0, 0x26, 0x8f, 0x32, 0xb1, 0x96, 0xaa, 0x7f, 0x95, 0x21, 0xc5, 0x1f, 0x82, 0x9e, 0x41, + 0x92, 0xad, 0x27, 0xb4, 0x19, 0xb9, 0x6c, 0x69, 0x79, 0x19, 0x85, 0x38, 0x28, 0x1a, 0xf7, 0x12, + 0xd4, 0xd0, 0x8a, 0x68, 0x2b, 0x6e, 0xcd, 0xe8, 0xb3, 0xed, 0xbf, 0xe1, 0xf0, 0xc3, 0xa7, 0x12, + 0x6a, 0x01, 0x2c, 0xac, 0x83, 0x8a, 0xb1, 0xe5, 0xb0, 0xec, 0x4f, 0xc3, 0x78, 0x88, 0x12, 0xf5, + 0x0f, 0x21, 0xb7, 0x34, 0x4f, 0x14, 0x0f, 0x8d, 0x99, 0xc9, 0xf8, 0xef, 0x41, 0x2e, 0xcc, 0xd3, + 0x2c, 0xde, 0xfd, 0x2a, 0x25, 0xee, 0xee, 0x4b, 0xd2, 0xf7, 0xfb, 0x92, 0xf4, 0xf3, 0xbe, 0x24, + 0x7d, 0x48, 0xf3, 0x95, 0x38, 0x1d, 0x8d, 0x54, 0xbe, 0xcb, 0x9f, 0xff, 0x09, 0x00, 0x00, 0xff, + 0xff, 0x92, 0x5a, 0x97, 0xd8, 0x03, 0x06, 0x00, 0x00, } diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 2c264f1c15..95a9d59e4d 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -12,10 +12,6 @@ option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; /// Store reprents API against instance that stores XOR encoded values with label set metadata (e.g Prometheus metrics). -/// -/// Partial Response is supported unless `partial_response_disabled` is true. When disabled any error that will result -/// in partial data returned (e.g missing chunk series because of underlying storeAPI is temporarily not available) is -/// failing the request. service Store { /// Info returns meta information about a store e.g labels that makes that store unique as well as time range that is /// available. @@ -51,6 +47,20 @@ message InfoResponse { StoreType storeType = 4; } +/// PartialResponseStrategy controls partial response handling. +enum PartialResponseStrategy { + /// WARN strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying + /// storeAPI is temporarily not available) as warning which will not fail the whole query (still OK response). + /// Server should produce those as a warnings field in response. + WARN = 0; + /// ABORT strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying + /// storeAPI is temporarily not available) as the gRPC error that aborts the query. + /// + /// This is especially useful for any rule/alert evaluations on top of StoreAPI which usually does not tolerate partial + /// errors. + ABORT = 1; +} + message SeriesRequest { int64 min_time = 1; int64 max_time = 2; @@ -59,7 +69,11 @@ message SeriesRequest { int64 max_resolution_window = 4; repeated Aggr aggregates = 5; + // Deprecated. Use partial_response_strategy instead. bool partial_response_disabled = 6; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 7; } enum Aggr { @@ -83,6 +97,9 @@ message SeriesResponse { message LabelNamesRequest { bool partial_response_disabled = 1; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 2; } message LabelNamesResponse { @@ -94,6 +111,9 @@ message LabelValuesRequest { string label = 1; bool partial_response_disabled = 2; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 3; } message LabelValuesResponse { diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 0f6767b97b..9344fbc9d0 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -153,8 +153,8 @@ func (m *Chunk) XXX_DiscardUnknown() { var xxx_messageInfo_Chunk proto.InternalMessageInfo type Series struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` - Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks" json:"chunks"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -196,12 +196,12 @@ var xxx_messageInfo_Series proto.InternalMessageInfo type AggrChunk struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` - Raw *Chunk `protobuf:"bytes,3,opt,name=raw" json:"raw,omitempty"` - Count *Chunk `protobuf:"bytes,4,opt,name=count" json:"count,omitempty"` - Sum *Chunk `protobuf:"bytes,5,opt,name=sum" json:"sum,omitempty"` - Min *Chunk `protobuf:"bytes,6,opt,name=min" json:"min,omitempty"` - Max *Chunk `protobuf:"bytes,7,opt,name=max" json:"max,omitempty"` - Counter *Chunk `protobuf:"bytes,8,opt,name=counter" json:"counter,omitempty"` + Raw *Chunk `protobuf:"bytes,3,opt,name=raw,proto3" json:"raw,omitempty"` + Count *Chunk `protobuf:"bytes,4,opt,name=count,proto3" json:"count,omitempty"` + Sum *Chunk `protobuf:"bytes,5,opt,name=sum,proto3" json:"sum,omitempty"` + Min *Chunk `protobuf:"bytes,6,opt,name=min,proto3" json:"min,omitempty"` + Max *Chunk `protobuf:"bytes,7,opt,name=max,proto3" json:"max,omitempty"` + Counter *Chunk `protobuf:"bytes,8,opt,name=counter,proto3" json:"counter,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -541,6 +541,9 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *Label) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Name) @@ -558,6 +561,9 @@ func (m *Label) Size() (n int) { } func (m *Chunk) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { @@ -574,6 +580,9 @@ func (m *Chunk) Size() (n int) { } func (m *Series) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -595,6 +604,9 @@ func (m *Series) Size() (n int) { } func (m *AggrChunk) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.MinTime != 0 { @@ -634,6 +646,9 @@ func (m *AggrChunk) Size() (n int) { } func (m *LabelMatcher) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { diff --git a/pkg/ui/rule.go b/pkg/ui/rule.go index ff4b9412ca..2493e7f0f3 100644 --- a/pkg/ui/rule.go +++ b/pkg/ui/rule.go @@ -10,6 +10,8 @@ import ( "sort" "github.com/go-kit/kit/log" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" + "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/rules" @@ -20,16 +22,16 @@ type Rule struct { flagsMap map[string]string - ruleManager *rules.Manager - queryURL string + ruleManagers thanosrule.Managers + queryURL string } -func NewRuleUI(logger log.Logger, ruleManager *rules.Manager, queryURL string, flagsMap map[string]string) *Rule { +func NewRuleUI(logger log.Logger, ruleManagers map[storepb.PartialResponseStrategy]*rules.Manager, queryURL string, flagsMap map[string]string) *Rule { return &Rule{ - BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), - flagsMap: flagsMap, - ruleManager: ruleManager, - queryURL: queryURL, + BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), + flagsMap: flagsMap, + ruleManagers: ruleManagers, + queryURL: queryURL, } } @@ -96,7 +98,7 @@ func ruleTmplFuncs(queryURL string) template.FuncMap { } func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { - alerts := ru.ruleManager.AlertingRules() + alerts := ru.ruleManagers.AlertingRules() alertsSorter := byAlertStateAndNameSorter{alerts: alerts} sort.Sort(alertsSorter) @@ -111,13 +113,15 @@ func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(ru.logger, ru.flagsMap, r) + // TODO(bwplotka): Update HTML to include partial response. ru.executeTemplate(w, "alerts.html", prefix, alertStatus) } func (ru *Rule) rules(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(ru.logger, ru.flagsMap, r) - ru.executeTemplate(w, "rules.html", prefix, ru.ruleManager) + // TODO(bwplotka): Update HTML to include partial response. + ru.executeTemplate(w, "rules.html", prefix, ru.ruleManagers) } // root redirects / requests to /graph, taking into account the path prefix value @@ -139,12 +143,12 @@ func (ru *Rule) Register(r *route.Router) { // AlertStatus bundles alerting rules and the mapping of alert states to row classes. type AlertStatus struct { - AlertingRules []*rules.AlertingRule + AlertingRules []thanosrule.AlertingRule AlertStateToRowClass map[rules.AlertState]string } type byAlertStateAndNameSorter struct { - alerts []*rules.AlertingRule + alerts []thanosrule.AlertingRule } func (s byAlertStateAndNameSorter) Len() int { diff --git a/scripts/genproto.sh b/scripts/genproto.sh index fa2933831f..5749e699ea 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -18,7 +18,7 @@ if ! [[ $(${PROTOC_BIN} --version) =~ "3.4.0" ]]; then exit 255 fi -THANOS_ROOT="${GOPATH}/src/github.com/improbable-eng/thanos" +THANOS_ROOT=$(pwd) PROM_PATH="${THANOS_ROOT}/pkg/store/storepb" GOGOPROTO_ROOT="${THANOS_ROOT}/vendor/github.com/gogo/protobuf" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 7ef9c7ac8b..d8b4fc2607 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -91,11 +91,22 @@ func testQuerySimple(t *testing.T, conf testConfig) { default: } - var err error - res, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), false) + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) if err != nil { return err } + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + expectedRes := 4 if conf.name == "gossip" { expectedRes = 3 @@ -146,11 +157,22 @@ func testQuerySimple(t *testing.T, conf testConfig) { default: } - var err error - res, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), true) + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "up", time.Now(), promclient.QueryOptions{ + Deduplicate: true, + }) if err != nil { return err } + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + expectedRes := 3 if conf.name == "gossip" { expectedRes = 2 diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 4af72bd648..6432638cc4 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -3,42 +3,68 @@ package e2e_test import ( "context" "encoding/json" + "fmt" + "io/ioutil" + "math" "net/http" + "os" + "path" "sort" "testing" "time" "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" + "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" ) -const alwaysFireRule = ` +const ( + testAlertRuleAbortOnPartialResponse = ` groups: - name: example + # Abort should be a default: partial_response_strategy: "ABORT" rules: - - alert: AlwaysFiring - expr: vector(1) + - alert: TestAlert_AbortOnPartialResponse + # It must be based on actual metrics otherwise call to StoreAPI would be not involved. + expr: absent(some_metric) labels: severity: page annotations: - summary: "I always complain" + summary: "I always complain, but I don't allow partial response in query." ` + testAlertRuleWarnOnPartialResponse = ` +groups: +- name: example + partial_response_strategy: "WARN" + rules: + - alert: TestAlert_WarnOnPartialResponse + # It must be based on actual metric, otherwise call to StoreAPI would be not involved. + expr: absent(some_metric) + labels: + severity: page + annotations: + summary: "I always complain and allow partial response in query." +` +) var ( + alertsToTest = []string{testAlertRuleAbortOnPartialResponse, testAlertRuleWarnOnPartialResponse} + ruleStaticFlagsSuite = newSpinupSuite(). Add(querierWithStoreFlags(1, "", rulerGRPC(1), rulerGRPC(2))). - Add(rulerWithQueryFlags(1, alwaysFireRule, queryHTTP(1))). - Add(rulerWithQueryFlags(2, alwaysFireRule, queryHTTP(1))). + Add(rulerWithQueryFlags(1, alertsToTest, queryHTTP(1))). + Add(rulerWithQueryFlags(2, alertsToTest, queryHTTP(1))). Add(alertManager(1)) ruleFileSDSuite = newSpinupSuite(). Add(querierWithFileSD(1, "", rulerGRPC(1), rulerGRPC(2))). - Add(rulerWithFileSD(1, alwaysFireRule, queryHTTP(1))). - Add(rulerWithFileSD(2, alwaysFireRule, queryHTTP(1))). + Add(rulerWithFileSD(1, alertsToTest, queryHTTP(1))). + Add(rulerWithFileSD(2, alertsToTest, queryHTTP(1))). Add(alertManager(1)) ) @@ -82,14 +108,28 @@ func testRuleComponent(t *testing.T, conf testConfig) { { "__name__": "ALERTS", "severity": "page", - "alertname": "AlwaysFiring", + "alertname": "TestAlert_AbortOnPartialResponse", + "alertstate": "firing", + "replica": "1", + }, + { + "__name__": "ALERTS", + "severity": "page", + "alertname": "TestAlert_AbortOnPartialResponse", + "alertstate": "firing", + "replica": "2", + }, + { + "__name__": "ALERTS", + "severity": "page", + "alertname": "TestAlert_WarnOnPartialResponse", "alertstate": "firing", "replica": "1", }, { "__name__": "ALERTS", "severity": "page", - "alertname": "AlwaysFiring", + "alertname": "TestAlert_WarnOnPartialResponse", "alertstate": "firing", "replica": "2", }, @@ -97,17 +137,27 @@ func testRuleComponent(t *testing.T, conf testConfig) { expAlertLabels := []model.LabelSet{ { "severity": "page", - "alertname": "AlwaysFiring", + "alertname": "TestAlert_AbortOnPartialResponse", + "replica": "1", + }, + { + "severity": "page", + "alertname": "TestAlert_AbortOnPartialResponse", + "replica": "2", + }, + { + "severity": "page", + "alertname": "TestAlert_WarnOnPartialResponse", "replica": "1", }, { "severity": "page", - "alertname": "AlwaysFiring", + "alertname": "TestAlert_WarnOnPartialResponse", "replica": "2", }, } - testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { select { case <-exit: cancel() @@ -118,13 +168,22 @@ func testRuleComponent(t *testing.T, conf testConfig) { qtime := time.Now() // The time series written for the firing alerting rule must be queryable. - res, err := promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "ALERTS", time.Now(), false) + res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "ALERTS", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) if err != nil { return err } - if len(res) != 2 { + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + + if len(res) != len(expMetrics) { return errors.Errorf("unexpected result length %d", len(res)) } + for i, r := range res { if !r.Metric.Equal(expMetrics[i]) { return errors.Errorf("unexpected metric %s", r.Metric) @@ -136,12 +195,13 @@ func testRuleComponent(t *testing.T, conf testConfig) { return errors.Errorf("unexpected value %f", r.Value) } } + // A notification must be sent to Alertmanager. alrts, err := queryAlertmanagerAlerts(ctx, "http://localhost:29093") if err != nil { return err } - if len(alrts) != 2 { + if len(alrts) != len(expAlertLabels) { return errors.Errorf("unexpected alerts length %d", len(alrts)) } for i, a := range alrts { @@ -151,6 +211,233 @@ func testRuleComponent(t *testing.T, conf testConfig) { } return nil })) + + // checks counter ensures we are not missing metrics. + checks := 0 + // Check metrics to make sure we report correct ones that allow handling the AlwaysFiring not being triggered because of query issue. + testutil.Ok(t, promclient.MetricValues(ctx, nil, urlParse(t, "http://"+rulerHTTP(1)), func(lset labels.Labels, val float64) error { + switch lset.Get("__name__") { + case "prometheus_rule_group_rules": + checks++ + if val != 1 { + return errors.Errorf("expected 1 loaded groups for strategy %s but found %v", lset.Get("strategy"), val) + } + } + + return nil + })) + testutil.Equals(t, 2, checks) +} + +type failingStoreAPI struct{} + +func (a *failingStoreAPI) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { + return &storepb.InfoResponse{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + Labels: []storepb.Label{ + { + Name: "magic", + Value: "store_api", + }, + }, + }, nil +} + +func (a *failingStoreAPI) Series(_ *storepb.SeriesRequest, _ storepb.Store_SeriesServer) error { + return errors.New("I always fail. No reason. I am just offended StoreAPI. Don't touch me") +} + +func (a *failingStoreAPI) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + return &storepb.LabelNamesResponse{}, nil +} + +func (a *failingStoreAPI) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + return &storepb.LabelValuesResponse{}, nil +} + +// Test Ruler behaviour on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. +func TestRulePartialResponse(t *testing.T) { + const expectedWarning = "receive series from Addr: 127.0.0.1:21091 Labels: [{magic store_api {} [] 0}] Mint: -9223372036854775808 Maxt: 9223372036854775807: rpc error: code = Unknown desc = I always fail. No reason. I am just offended StoreAPI. Don't touch me" + + dir, err := ioutil.TempDir("", "test_rulepartial_respn") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + suite := newSpinupSuite(). + Add(querierWithStoreFlags(1, "", rulerGRPC(1), fakeStoreAPIGRPC(1))). + Add(rulerWithDir(1, dir, queryHTTP(1))). + Add(fakeStoreAPI(1, &failingStoreAPI{})). + Add(alertManager(1)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + + exit, err := suite.Exec(t, ctx, "test_rule_partial_response_component") + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + // The time series written for the firing alerting rule must be queryable. + res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "ALERTS", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil { + return err + } + + if len(warnings) != 1 { + // We do expect warnings. + return errors.Errorf("unexpected number of warnings, expected 1, got %s", warnings) + } + + // This is tricky as for initial time (1 rule eval, we will have both alerts, as "No store match queries" will be there. + if len(res) != 0 { + return errors.Errorf("unexpected result length. expected %v, got %v", 0, res) + } + return nil + })) + + // Add alerts to ruler, we want to add it only when Querier is rdy, otherwise we will get "no store match the query". + for i, rule := range alertsToTest { + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, fmt.Sprintf("rules-%d.yaml", i)), []byte(rule), 0666)) + } + + resp, err := http.Post("http://"+rulerHTTP(1)+"/-/reload", "", nil) + testutil.Ok(t, err) + defer func() { _, _ = ioutil.ReadAll(resp.Body); _ = resp.Body.Close() }() + testutil.Equals(t, http.StatusOK, resp.StatusCode) + + // We don't expect `AlwaysFiring` as it does NOT allow PartialResponse, so it will trigger `prometheus_rule_evaluation_failures_total` instead. + expMetrics := []model.Metric{ + { + "__name__": "ALERTS", + "severity": "page", + "alertname": "TestAlert_WarnOnPartialResponse", + "alertstate": "firing", + "replica": "1", + }, + } + expAlertLabels := []model.LabelSet{ + { + "severity": "page", + "alertname": "TestAlert_WarnOnPartialResponse", + "replica": "1", + }, + } + + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + qtime := time.Now() + + // The time series written for the firing alerting rule must be queryable. + res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "ALERTS", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil { + return err + } + + if len(warnings) != 1 { + // We do expect warnings. + return errors.Errorf("unexpected number of warnings, expected 1, got %s", warnings) + } + + if warnings[0] != expectedWarning { + return errors.Errorf("unexpected warning, expected %s, got %s", expectedWarning, warnings[0]) + } + + // This is tricky as for initial time (1 rule eval, we will have both alerts, as "No store match queries" will be there. + if len(res) != len(expMetrics) { + return errors.Errorf("unexpected result length. expected %v, got %v", len(expMetrics), res) + } + + for i, r := range res { + if !r.Metric.Equal(expMetrics[i]) { + return errors.Errorf("unexpected metric %s, expected %s", r.Metric, expMetrics[i]) + } + if int64(r.Timestamp) != timestamp.FromTime(qtime) { + return errors.Errorf("unexpected timestamp %d", r.Timestamp) + } + if r.Value != 1 { + return errors.Errorf("unexpected value %f", r.Value) + } + } + + // A notification must be sent to Alertmanager. + alrts, err := queryAlertmanagerAlerts(ctx, "http://localhost:29093") + if err != nil { + return err + } + if len(alrts) != len(expAlertLabels) { + return errors.Errorf("unexpected alerts length %d", len(alrts)) + } + for i, a := range alrts { + if !a.Labels.Equal(expAlertLabels[i]) { + return errors.Errorf("unexpected labels %s", a.Labels) + } + } + return nil + })) + + // checks counter ensures we are not missing metrics. + checks := 0 + // Check metrics to make sure we report correct ones that allow handling the AlwaysFiring not being triggered because of query issue. + testutil.Ok(t, promclient.MetricValues(ctx, nil, urlParse(t, "http://"+rulerHTTP(1)), func(lset labels.Labels, val float64) error { + switch lset.Get("__name__") { + case "prometheus_rule_group_rules": + checks++ + if val != 1 { + return errors.Errorf("expected 1 loaded groups for strategy %s but found %v", lset.Get("strategy"), val) + } + case "prometheus_rule_evaluation_failures_total": + if lset.Get("strategy") == "abort" { + checks++ + if val <= 0 { + return errors.Errorf("expected rule eval failures for abort strategy rule as we have failing storeAPI but found %v", val) + } + } else if lset.Get("strategy") == "warn" { + checks++ + if val > 0 { + return errors.Errorf("expected no rule eval failures for warm strategy rule but found %v", val) + } + } + case "thanos_rule_evaluation_with_warnings_total": + if lset.Get("strategy") == "warn" { + checks++ + if val <= 0 { + return errors.Errorf("expected rule eval with warnings for warn strategy rule as we have failing storeAPI but found %v", val) + } + } else if lset.Get("strategy") == "abort" { + checks++ + if val > 0 { + return errors.Errorf("expected rule eval with warnings 0 for abort strategy rule but found %v", val) + } + } + } + return nil + })) + testutil.Equals(t, 6, checks) } // TODO(bwplotka): Move to promclient. diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 592c1e7e69..34bd40f695 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" + "net" "os" "os/exec" "path" @@ -14,11 +16,11 @@ import ( "github.com/improbable-eng/thanos/pkg/objstore/s3" "github.com/improbable-eng/thanos/pkg/runutil" - + "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/testutil" - "github.com/oklog/run" "github.com/pkg/errors" + "google.golang.org/grpc" ) var ( @@ -45,9 +47,37 @@ var ( storeGatewayHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20190+i) } minioHTTP = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 20290+i) } + + fakeStoreAPIGRPC = func(i int) string { return fmt.Sprintf("127.0.0.1:%d", 21090+i) } ) -type cmdScheduleFunc func(workDir string) ([]*exec.Cmd, error) +type Exec interface { + Start(stdout io.Writer, stderr io.Writer) error + Wait() error + Kill() error + + String() string +} + +type cmdExec struct { + *exec.Cmd +} + +func newCmdExec(cmd *exec.Cmd) *cmdExec { + return &cmdExec{Cmd: cmd} +} + +func (c *cmdExec) Start(stdout io.Writer, stderr io.Writer) error { + c.Stderr = stderr + c.Stdout = stdout + return c.Cmd.Start() +} + +func (c *cmdExec) Kill() error { return c.Process.Signal(syscall.SIGKILL) } + +func (c *cmdExec) String() string { return fmt.Sprintf("%s %s", c.Path, c.Args[1]) } + +type cmdScheduleFunc func(workDir string) ([]Exec, error) type spinupSuite struct { cmdScheduleFuncs []cmdScheduleFunc @@ -64,7 +94,7 @@ func (s *spinupSuite) Add(cmdSchedule cmdScheduleFunc) *spinupSuite { } func scraper(i int, config string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { promDir := fmt.Sprintf("%s/data/prom%d", workDir, i) if err := os.MkdirAll(promDir, 0777); err != nil { return nil, errors.Wrap(err, "create prom dir failed") @@ -74,26 +104,26 @@ func scraper(i int, config string) cmdScheduleFunc { return nil, errors.Wrap(err, "creating prom config failed") } - var cmds []*exec.Cmd - cmds = append(cmds, exec.Command(testutil.PrometheusBinary(), + var cmds []Exec + cmds = append(cmds, newCmdExec(exec.Command(testutil.PrometheusBinary(), "--config.file", promDir+"/prometheus.yml", "--storage.tsdb.path", promDir, "--log.level", "info", "--web.listen-address", promHTTP(i), - )) - return append(cmds, exec.Command("thanos", "sidecar", + ))) + return append(cmds, newCmdExec(exec.Command("thanos", "sidecar", "--debug.name", fmt.Sprintf("sidecar-%d", i), "--grpc-address", sidecarGRPC(i), "--http-address", sidecarHTTP(i), "--prometheus.url", fmt.Sprintf("http://%s", promHTTP(i)), "--tsdb.path", promDir, "--cluster.disable", - "--log.level", "debug")), nil + "--log.level", "debug"))), nil } } func receiver(i int, config string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { promDir := fmt.Sprintf("%s/data/remote-write-prom%d", workDir, i) if err := os.MkdirAll(promDir, 0777); err != nil { return nil, errors.Wrap(err, "create prom dir failed") @@ -103,48 +133,48 @@ func receiver(i int, config string) cmdScheduleFunc { return nil, errors.Wrap(err, "creating prom config failed") } - var cmds []*exec.Cmd - cmds = append(cmds, exec.Command(testutil.PrometheusBinary(), + var cmds []Exec + cmds = append(cmds, newCmdExec(exec.Command(testutil.PrometheusBinary(), "--config.file", promDir+"/prometheus.yml", "--storage.tsdb.path", promDir, "--log.level", "info", "--web.listen-address", promRemoteWriteHTTP(i), - )) - return append(cmds, exec.Command("thanos", "receive", + ))) + return append(cmds, newCmdExec(exec.Command("thanos", "receive", "--debug.name", fmt.Sprintf("remote-write-receive-%d", i), "--grpc-address", remoteWriteReceiveGRPC(i), "--http-address", remoteWriteReceiveMetricHTTP(i), "--remote-write.address", remoteWriteReceiveHTTP(i), "--tsdb.path", promDir, - "--log.level", "debug")), nil + "--log.level", "debug"))), nil } } func querier(i int, replicaLabel string, staticStores ...string) cmdScheduleFunc { - return func(_ string) ([]*exec.Cmd, error) { + return func(_ string) ([]Exec, error) { args := append(defaultQuerierFlags(i, replicaLabel), "--cluster.gossip-interval", "200ms", "--cluster.pushpull-interval", "200ms") for _, s := range staticStores { args = append(args, "--store", s) } - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } func querierWithStoreFlags(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc { - return func(_ string) ([]*exec.Cmd, error) { + return func(_ string) ([]Exec, error) { args := defaultQuerierFlags(i, replicaLabel) for _, addr := range storesAddresses { args = append(args, "--store", addr) } - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } func querierWithFileSD(i int, replicaLabel string, storesAddresses ...string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { queryFileSDDir := fmt.Sprintf("%s/data/queryFileSd%d", workDir, i) if err := os.MkdirAll(queryFileSDDir, 0777); err != nil { return nil, errors.Wrap(err, "create prom dir failed") @@ -160,19 +190,19 @@ func querierWithFileSD(i int, replicaLabel string, storesAddresses ...string) cm "--store.sd-interval", "5s", ) - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } func storeGateway(i int, bucketConfig []byte) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { dbDir := fmt.Sprintf("%s/data/store-gateway%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { return nil, errors.Wrap(err, "creating store gateway dir failed") } - return []*exec.Cmd{exec.Command("thanos", + return []Exec{newCmdExec(exec.Command("thanos", "store", "--debug.name", fmt.Sprintf("store-%d", i), "--data-dir", dbDir, @@ -182,12 +212,12 @@ func storeGateway(i int, bucketConfig []byte) cmdScheduleFunc { "--objstore.config", string(bucketConfig), // Accelerated sync time for quicker test (3m by default) "--sync-block-duration", "5s", - )}, nil + ))}, nil } } func alertManager(i int) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { dir := fmt.Sprintf("%s/data/alertmanager%d", workDir, i) if err := os.MkdirAll(dir, 0777); err != nil { @@ -205,85 +235,150 @@ receivers: if err := ioutil.WriteFile(dir+"/config.yaml", []byte(config), 0666); err != nil { return nil, errors.Wrap(err, "creating alertmanager config file failed") } - return []*exec.Cmd{exec.Command(testutil.AlertmanagerBinary(), + return []Exec{newCmdExec(exec.Command(testutil.AlertmanagerBinary(), "--config.file", dir+"/config.yaml", "--web.listen-address", "127.0.0.1:29093", "--log.level", "debug", - )}, nil + ))}, nil } } -func ruler(i int, rules string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { +func rulerWithQueryFlags(i int, rules []string, queryAddresses ...string) cmdScheduleFunc { + return func(workDir string) ([]Exec, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") + return nil, errors.Wrap(err, "creating ruler dir") } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + for i, rule := range rules { + if err := ioutil.WriteFile(path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { + return nil, errors.Wrapf(err, "writing rule %s", path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i))) + } } - args := append(defaultRulerFlags(i, dbDir), - "--cluster.gossip-interval", "200ms", - "--cluster.pushpull-interval", "200ms") - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + args := defaultRulerFlags(i, dbDir, dbDir) + + for _, addr := range queryAddresses { + args = append(args, "--query", addr) + } + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } -func rulerWithQueryFlags(i int, rules string, queryAddresses ...string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { +func rulerWithDir(i int, ruleDir string, queryAddresses ...string) cmdScheduleFunc { + return func(workDir string) ([]Exec, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") - } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + return nil, errors.Wrap(err, "creating ruler dir") } - args := defaultRulerFlags(i, dbDir) + args := defaultRulerFlags(i, dbDir, ruleDir) for _, addr := range queryAddresses { args = append(args, "--query", addr) } - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil } } -func rulerWithFileSD(i int, rules string, queryAddresses ...string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { +func rulerWithFileSD(i int, rules []string, queryAddresses ...string) cmdScheduleFunc { + return func(workDir string) ([]Exec, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") + return nil, errors.Wrap(err, "creating ruler dir") } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + for i, rule := range rules { + if err := ioutil.WriteFile(path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { + return nil, errors.Wrapf(err, "writing rule %s", path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i))) + } } ruleFileSDDir := fmt.Sprintf("%s/data/ruleFileSd%d", workDir, i) if err := os.MkdirAll(ruleFileSDDir, 0777); err != nil { - return nil, errors.Wrap(err, "create ruler filesd dir failed") + return nil, errors.Wrap(err, "create ruler filesd dir") } if err := ioutil.WriteFile(ruleFileSDDir+"/filesd.json", []byte(generateFileSD(queryAddresses)), 0666); err != nil { - return nil, errors.Wrap(err, "creating ruler filesd config failed") + return nil, errors.Wrap(err, "creating ruler filesd config") } - args := append(defaultRulerFlags(i, dbDir), + args := append(defaultRulerFlags(i, dbDir, dbDir), "--query.sd-files", path.Join(ruleFileSDDir, "filesd.json"), "--query.sd-interval", "5s") - return []*exec.Cmd{exec.Command("thanos", args...)}, nil + return []Exec{newCmdExec(exec.Command("thanos", args...))}, nil + } +} + +type sameProcessGRPCServiceExec struct { + i int + stdout io.Writer + stderr io.Writer + + ctx context.Context + cancel context.CancelFunc + srvChan <-chan error + srv *grpc.Server +} + +func (c *sameProcessGRPCServiceExec) Start(stdout io.Writer, stderr io.Writer) error { + c.stderr = stderr + c.stdout = stdout + + if c.ctx != nil { + return errors.New("process already started") + } + c.ctx, c.cancel = context.WithCancel(context.Background()) + + l, err := net.Listen("tcp", fakeStoreAPIGRPC(c.i)) + if err != nil { + return errors.Wrap(err, "listen API address") + } + + srvChan := make(chan error) + go func() { + defer close(srvChan) + if err := c.srv.Serve(l); err != nil { + srvChan <- err + _, _ = c.stderr.Write([]byte(fmt.Sprintf("server failed: %s", err))) + } + + }() + c.srvChan = srvChan + return nil +} + +func (c *sameProcessGRPCServiceExec) Wait() error { + err := <-c.srvChan + if c.ctx.Err() == nil && err != nil { + return err + } + return err +} +func (c *sameProcessGRPCServiceExec) Kill() error { + c.cancel() + c.srv.Stop() + + return nil +} + +func (c *sameProcessGRPCServiceExec) String() string { + return fmt.Sprintf("gRPC service %v on %v", c.i, fakeStoreAPIGRPC(c.i)) +} + +func fakeStoreAPI(i int, svc storepb.StoreServer) cmdScheduleFunc { + return func(_ string) ([]Exec, error) { + srv := grpc.NewServer() + storepb.RegisterStoreServer(srv, svc) + + return []Exec{&sameProcessGRPCServiceExec{i: i, srv: srv}}, nil } } func minio(accessKey string, secretKey string) cmdScheduleFunc { - return func(workDir string) ([]*exec.Cmd, error) { + return func(workDir string) ([]Exec, error) { dbDir := fmt.Sprintf("%s/data/minio", workDir) if err := os.MkdirAll(dbDir, 0777); err != nil { @@ -299,7 +394,7 @@ func minio(accessKey string, secretKey string) cmdScheduleFunc { fmt.Sprintf("MINIO_ACCESS_KEY=%s", accessKey), fmt.Sprintf("MINIO_SECRET_KEY=%s", secretKey)) - return []*exec.Cmd{cmd}, nil + return []Exec{newCmdExec(cmd)}, nil } } @@ -384,7 +479,7 @@ func (s *spinupSuite) Exec(t testing.TB, ctx context.Context, testName string) ( }) } - var commands []*exec.Cmd + var commands []Exec for _, cmdFunc := range s.cmdScheduleFuncs { cmds, err := cmdFunc(dir) @@ -398,11 +493,7 @@ func (s *spinupSuite) Exec(t testing.TB, ctx context.Context, testName string) ( // Run go routine for each command. for _, c := range commands { var stderr, stdout bytes.Buffer - c.Stderr = &stderr - c.Stdout = &stdout - - err := c.Start() - if err != nil { + if err := c.Start(&stdout, &stderr); err != nil { // Let already started commands finish. go func() { _ = g.Run() }() return nil, errors.Wrap(err, "failed to start") @@ -410,7 +501,7 @@ func (s *spinupSuite) Exec(t testing.TB, ctx context.Context, testName string) ( cmd := c g.Add(func() error { - id := fmt.Sprintf("%s %s", cmd.Path, cmd.Args[1]) + id := c.String() err := cmd.Wait() @@ -424,7 +515,7 @@ func (s *spinupSuite) Exec(t testing.TB, ctx context.Context, testName string) ( return errors.Wrap(err, id) }, func(error) { // This's accepted scenario to kill a process immediately for sure and run tests as fast as possible. - _ = cmd.Process.Signal(syscall.SIGKILL) + _ = cmd.Kill() }) } @@ -467,12 +558,12 @@ func defaultQuerierFlags(i int, replicaLabel string) []string { } } -func defaultRulerFlags(i int, dbDir string) []string { +func defaultRulerFlags(i int, dbDir string, ruleDir string) []string { return []string{"rule", "--debug.name", fmt.Sprintf("rule-%d", i), "--label", fmt.Sprintf(`replica="%d"`, i), "--data-dir", dbDir, - "--rule-file", path.Join(dbDir, "*.yaml"), + "--rule-file", path.Join(ruleDir, "*.yaml"), "--eval-interval", "1s", "--alertmanagers.url", "http://127.0.0.1:29093", "--grpc-address", rulerGRPC(i), diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 6c36a963c8..8648dc5b3d 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -93,11 +93,22 @@ func TestStoreGatewayQuery(t *testing.T) { default: } - var err error - res, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "{a=\"1\"}", time.Now(), false) + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "{a=\"1\"}", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) if err != nil { return err } + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + if len(res) != 2 { return errors.Errorf("unexpected result size %d", len(res)) } @@ -127,11 +138,22 @@ func TestStoreGatewayQuery(t *testing.T) { default: } - var err error - res, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "{a=\"1\"}", time.Now(), true) + var ( + err error + warnings []string + ) + res, warnings, err = promclient.QueryInstant(ctx, nil, urlParse(t, "http://"+queryHTTP(1)), "{a=\"1\"}", time.Now(), promclient.QueryOptions{ + Deduplicate: true, + }) if err != nil { return err } + + if len(warnings) > 0 { + // we don't expect warnings. + return errors.Errorf("unexpected warnings %s", warnings) + } + if len(res) != 1 { return errors.Errorf("unexpected result size %d", len(res)) }