Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: Support store matchers and time range filter on labels API #3133

Merged
merged 4 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Added

- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs.

### Changed

- [#3136](https://github.com/thanos-io/thanos/pull/3136) Sidecar: Add metric `thanos_sidecar_reloader_config_apply_operations_total` and rename metric `thanos_sidecar_reloader_config_apply_errors_total` to `thanos_sidecar_reloader_config_apply_operations_failed_total`.
Expand Down
41 changes: 24 additions & 17 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

const (
DedupParam = "dedup"
PartialResponseParam = "partial_response"
MaxSourceResolutionParam = "max_source_resolution"
ReplicaLabelsParam = "replicaLabels[]"
StoreMatcherParam = "storeMatch[]"
)

// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
Expand Down Expand Up @@ -139,41 +147,38 @@ type queryData struct {
}

func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
const dedupParam = "dedup"
enableDeduplication = true

if val := r.FormValue(dedupParam); val != "" {
if val := r.FormValue(DedupParam); val != "" {
var err error
enableDeduplication, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", dedupParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", DedupParam)}
}
}
return enableDeduplication, nil
}

func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) {
const replicaLabelsParam = "replicaLabels[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

replicaLabels = qapi.replicaLabels
// Overwrite the cli flag when provided as a query parameter.
if len(r.Form[replicaLabelsParam]) > 0 {
replicaLabels = r.Form[replicaLabelsParam]
if len(r.Form[ReplicaLabelsParam]) > 0 {
replicaLabels = r.Form[ReplicaLabelsParam]
}

return replicaLabels, nil
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

for _, s := range r.Form[storeMatcherParam] {
for _, s := range r.Form[StoreMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -189,37 +194,34 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers []
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second

val := r.FormValue(maxSourceResolutionParam)
val := r.FormValue(MaxSourceResolutionParam)
if qapi.enableAutodownsampling || (val == "auto") {
maxSourceResolution = defaultVal
}
if val != "" && val != "auto" {
var err error
maxSourceResolution, err = parseDuration(val)
if err != nil {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", MaxSourceResolutionParam)}
}
}

if maxSourceResolution < 0 {
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
return 0, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("negative '%s' is not accepted. Try a positive integer", MaxSourceResolutionParam)}
}

return int64(maxSourceResolution / time.Millisecond), nil
}

func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) {
const partialResponseParam = "partial_response"

// Overwrite the cli flag when provided as a query parameter.
if val := r.FormValue(partialResponseParam); val != "" {
if val := r.FormValue(PartialResponseParam); val != "" {
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(val)
if err != nil {
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", partialResponseParam)}
return false, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", PartialResponseParam)}
}
}
return defaultEnablePartialResponse, nil
Expand Down Expand Up @@ -439,7 +441,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
50 changes: 49 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set("max_source_resolution", test.maxSourceResolutionParam)
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand All @@ -1070,6 +1070,54 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
}
}

func TestParseStoreMatchersParam(t *testing.T) {
for i, tc := range []struct {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
storeMatchers string
fail bool
result [][]storepb.LabelMatcher
}{
{
storeMatchers: "123",
fail: true,
},
{
storeMatchers: "foo",
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}},
},
{
storeMatchers: `{__address__="localhost:10905"}`,
fail: false,
result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}},
},
{
storeMatchers: `{__address__="localhost:10905", cluster="test"}`,
fail: false,
result: [][]storepb.LabelMatcher{{
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"},
storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"},
}},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
r := &http.Request{PostForm: v}

storeMatchers, err := api.parseStoreMatchersParam(r)
if !tc.fail {
testutil.Equals(t, tc.result, storeMatchers)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
testutil.Equals(t, (*baseAPI.ApiError)(nil), err)
} else {
testutil.NotOk(t, err)
}
})
}
}

type mockedRulesClient struct {
g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup
w storage.Warnings
Expand Down
8 changes: 7 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
Expand Down Expand Up @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Label: name,
PartialResponseDisabled: !q.partialResponse,
Expand All @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

// TODO: Pass it using the SeriesRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: !q.partialResponse,
Start: q.mint,
Expand Down
43 changes: 22 additions & 21 deletions pkg/queryfrontend/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
return nil, errStepTooSmall
}

result.Dedup, err = parseEnableDedupParam(r.FormValue("dedup"))
result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam))
if err != nil {
return nil, err
}
Expand All @@ -92,22 +93,22 @@ func (c codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Req
result.AutoDownsampling = true
result.MaxSourceResolution = result.Step / 5
} else {
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue("max_source_resolution"))
result.MaxSourceResolution, err = parseDownsamplingParamMillis(r.FormValue(queryv1.MaxSourceResolutionParam))
if err != nil {
return nil, err
}
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue("partial_response"), c.partialResponse)
result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

if len(r.Form["replicaLabels[]"]) > 0 {
result.ReplicaLabels = r.Form["replicaLabels[]"]
if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 {
result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam]
}

result.StoreMatchers, err = parseStoreMatchersParam(r.Form["storeMatch[]"])
result.StoreMatchers, err = parseStoreMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}
Expand All @@ -131,29 +132,29 @@ func (c codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.R
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params := url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
"dedup": []string{strconv.FormatBool(thanosReq.Dedup)},
"partial_response": []string{strconv.FormatBool(thanosReq.PartialResponse)},
"replicaLabels[]": thanosReq.ReplicaLabels,
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
"step": []string{encodeDurationMillis(thanosReq.Step)},
"query": []string{thanosReq.Query},
queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels,
}

if thanosReq.AutoDownsampling {
params["max_source_resolution"] = []string{"auto"}
params[queryv1.MaxSourceResolutionParam] = []string{"auto"}
} else if thanosReq.MaxSourceResolution != 0 {
// Add this param only if it is set. Set to 0 will impact
// auto-downsampling in the querier.
params["max_source_resolution"] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
params[queryv1.MaxSourceResolutionParam] = []string{encodeDurationMillis(thanosReq.MaxSourceResolution)}
}

if len(thanosReq.StoreMatchers) > 0 {
storeMatchers, err := matchersToStringSlice(thanosReq.StoreMatchers)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}
params["storeMatch[]"] = storeMatchers
params[queryv1.StoreMatcherParam] = storeMatchers
}

u := &url.URL{
Expand Down Expand Up @@ -191,7 +192,7 @@ func parseEnableDedupParam(s string) (bool, error) {
var err error
enableDeduplication, err = strconv.ParseBool(s)
if err != nil {
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "dedup")
return enableDeduplication, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.DedupParam)
}
}

Expand All @@ -204,7 +205,7 @@ func parseDownsamplingParamMillis(s string) (int64, error) {
var err error
maxSourceResolution, err = parseDurationMillis(s)
if err != nil {
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "max_source_resolution")
return maxSourceResolution, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.MaxSourceResolutionParam)
}
}

Expand All @@ -220,7 +221,7 @@ func parsePartialResponseParam(s string, defaultEnablePartialResponse bool) (boo
var err error
defaultEnablePartialResponse, err = strconv.ParseBool(s)
if err != nil {
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "partial_response")
return defaultEnablePartialResponse, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.PartialResponseParam)
}
}

Expand All @@ -232,11 +233,11 @@ func parseStoreMatchersParam(ss []string) ([][]storepb.LabelMatcher, error) {
for _, s := range ss {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, errCannotParse, queryv1.StoreMatcherParam)
}
stm, err := storepb.TranslatePromMatchers(matchers...)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "storeMatch[]")
return nil, httpgrpc.Errorf(http.StatusBadRequest, queryv1.StoreMatcherParam)
}
storeMatchers = append(storeMatchers, stm)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/queryfrontend/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/weaveworks/common/httpgrpc"
)

func TestCodec_DecodeRequest(t *testing.T) {
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("dedup") == "true"
r.URL.Query().Get(queryv1.DedupParam) == "true"
},
},
{
Expand All @@ -233,7 +235,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("partial_response") == "true"
r.URL.Query().Get(queryv1.PartialResponseParam) == "true"
},
},
{
Expand All @@ -248,7 +250,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "300"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "300"
},
},
{
Expand All @@ -263,7 +265,7 @@ func TestCodec_EncodeRequest(t *testing.T) {
return r.URL.Query().Get("start") == "123" &&
r.URL.Query().Get("end") == "456" &&
r.URL.Query().Get("step") == "1" &&
r.URL.Query().Get("max_source_resolution") == "3600"
r.URL.Query().Get(queryv1.MaxSourceResolutionParam) == "3600"
},
},
} {
Expand Down
Loading