diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 7cc30675b28..812463dcbe7 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -52,6 +52,14 @@ import ( "github.com/thanos-io/thanos/pkg/tracing" ) +const ( + dedupParam = "dedup" + partialResponseParam = "partial_response" + maxSourceResolutionParam = "max_source_resolution" + replicaLabelsParam = "replicaLabels[]" + storeMatcherParam = "storeMatch[]" +) + // QueryAPI is an API used by Thanos Query. type QueryAPI struct { baseAPI *api.BaseAPI @@ -139,7 +147,6 @@ type queryData struct { } func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) { - const dedupParam = "dedup" enableDeduplication = true if val := r.FormValue(dedupParam); val != "" { @@ -153,7 +160,6 @@ func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplicatio } func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *api.ApiError) { - const replicaLabelsParam = "replicaLabels[]" if err := r.ParseForm(); err != nil { return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} } @@ -168,7 +174,6 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels [] } func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) { - const storeMatcherParam = "storeMatch[]" if err := r.ParseForm(); err != nil { return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")} } @@ -189,7 +194,6 @@ func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [] } func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) { - const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution := 0 * time.Second val := r.FormValue(maxSourceResolutionParam) @@ -211,8 +215,6 @@ func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal t } func (qapi *QueryAPI) parsePartialResponseParam(r *http.Request, defaultEnablePartialResponse bool) (enablePartialResponse bool, _ *api.ApiError) { - const partialResponseParam = "partial_response" - // Overwrite the cli flag when provided as a query parameter. if val := r.FormValue(partialResponseParam); val != "" { var err error @@ -438,7 +440,12 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A return nil, nil, apiErr } - q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false). + storeMatchers, apiErr := qapi.parseStoreMatchersParam(r) + if apiErr != nil { + return nil, nil, apiErr + } + + q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false). Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err} diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 05a91dcc15d..12fcb6e8e31 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1048,7 +1048,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) { gate: gate.NewKeeper(nil).NewGate(4), } v := url.Values{} - v.Set("max_source_resolution", test.maxSourceResolutionParam) + v.Set(maxSourceResolutionParam, test.maxSourceResolutionParam) r := http.Request{PostForm: v} // If no max_source_resolution is specified fit at least 5 samples between steps. @@ -1062,6 +1062,51 @@ func TestParseDownsamplingParamMillis(t *testing.T) { } } +func TestParseStoreMatchersParam(t *testing.T) { + for i, tc := range []struct { + storeMatchers string + fail bool + result [][]storepb.LabelMatcher + }{ + { + storeMatchers: "123", + fail: true, + }, + { + storeMatchers: "foo", + fail: false, + result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "foo"}}}, + }, + { + storeMatchers: `{__address__="localhost:10905"}`, + fail: false, + result: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}}}, + }, + { + storeMatchers: `{__address__="localhost:10905", cluster="test"}`, + fail: false, + result: [][]storepb.LabelMatcher{{ + storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "localhost:10905"}, + storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "cluster", Value: "test"}, + }}, + }, + } { + api := QueryAPI{ + gate: gate.NewKeeper(nil).NewGate(4), + } + v := url.Values{} + v.Set(storeMatcherParam, tc.storeMatchers) + r := &http.Request{PostForm: v} + + storeMatchers, err := api.parseStoreMatchersParam(r) + if !tc.fail { + testutil.Assert(t, reflect.DeepEqual(storeMatchers, tc.result), "case %v: expected %v to be equal to %v", i, storeMatchers, tc.result) + } else { + testutil.NotOk(t, err) + } + } +} + type mockedRulesClient struct { g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup w storage.Warnings diff --git a/pkg/query/querier.go b/pkg/query/querier.go index d766973649c..8d8765471b7 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -259,7 +259,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . aggrs := aggrsFromFunc(hints.Func) - // TODO: Pass it using the SerieRequest instead of relying on context + // TODO: Pass it using the SeriesRequest instead of relying on context ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) resp := &seriesServer{ctx: ctx} @@ -333,6 +333,9 @@ func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() + // TODO: Pass it using the SeriesRequest instead of relying on context + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) + resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ Label: name, PartialResponseDisabled: !q.partialResponse, @@ -356,6 +359,9 @@ func (q *querier) LabelNames() ([]string, storage.Warnings, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") defer span.Finish() + // TODO: Pass it using the SeriesRequest instead of relying on context + ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers) + resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{ PartialResponseDisabled: !q.partialResponse, Start: q.mint, diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c8ec49a043e..1bc60b9f9f3 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -270,10 +270,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...) }) if !ok { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st)) continue } - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) // This is used to cancel this stream when one operations takes too long. seriesCtx, closeSeries := context.WithCancel(gctx) @@ -515,7 +515,8 @@ func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatc return false, nil } match, err := storeMatchMetadata(s, storeMatcher) - if err != nil || !match { + // Return result here if no matchers set. + if len(matchers) == 0 || err != nil || !match { return match, err } return labelSetsMatch(s.LabelSets(), matchers) @@ -587,14 +588,32 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques *storepb.LabelNamesResponse, error, ) { var ( - warnings []string - names [][]string - mtx sync.Mutex - g, gctx = errgroup.WithContext(ctx) + warnings []string + names [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + storeDebugMsgs []string ) for _, st := range s.stores() { st := st + var ok bool + tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) { + storeMatcher := [][]storepb.LabelMatcher{} + if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil { + if value, ok := ctxVal.([][]storepb.LabelMatcher); ok { + storeMatcher = value + } + } + // We can skip error, we already translated matchers once. + ok, _ = storeMatches(st, r.Start, r.End, storeMatcher) + }) + if !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st)) + continue + } + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + g.Go(func() error { resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{ PartialResponseDisabled: r.PartialResponseDisabled, @@ -626,6 +645,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques return nil, err } + level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) return &storepb.LabelNamesResponse{ Names: strutil.MergeUnsortedSlices(names...), Warnings: warnings, @@ -637,14 +657,32 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ *storepb.LabelValuesResponse, error, ) { var ( - warnings []string - all [][]string - mtx sync.Mutex - g, gctx = errgroup.WithContext(ctx) + warnings []string + all [][]string + mtx sync.Mutex + g, gctx = errgroup.WithContext(ctx) + storeDebugMsgs []string ) for _, st := range s.stores() { store := st + var ok bool + tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) { + storeMatcher := [][]storepb.LabelMatcher{} + if ctxVal := ctx.Value(StoreMatcherKey); ctxVal != nil { + if value, ok := ctxVal.([][]storepb.LabelMatcher); ok { + storeMatcher = value + } + } + // We can skip error, we already translated matchers once. + ok, _ = storeMatches(st, r.Start, r.End, storeMatcher) + }) + if !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out", st)) + continue + } + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ Label: r.Label, @@ -677,6 +715,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ return nil, err } + level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) return &storepb.LabelValuesResponse{ Values: strutil.MergeUnsortedSlices(all...), Warnings: warnings, diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 005bd7d4e55..a8f6b90086e 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1087,6 +1087,13 @@ func TestProxyStore_LabelValues(t *testing.T) { Values: []string{"3", "4"}, }, }}, + &testClient{StoreClient: &mockedStoreAPI{ + RespLabelValues: &storepb.LabelValuesResponse{ + Values: []string{"5", "6"}, + }}, + minTime: timestamp.FromTime(time.Now().Add(-1 * time.Minute)), + maxTime: timestamp.FromTime(time.Now()), + }, } q := NewProxyStore(nil, nil, @@ -1107,6 +1114,20 @@ func TestProxyStore_LabelValues(t *testing.T) { testutil.Ok(t, err) testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq) + testutil.Equals(t, []string{"1", "2", "3", "4", "5", "6"}, resp.Values) + testutil.Equals(t, 1, len(resp.Warnings)) + + // Request outside the time range of the last store client. + req = &storepb.LabelValuesRequest{ + Label: "a", + PartialResponseDisabled: true, + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(time.Now().Add(-1 * time.Hour)), + } + resp, err = q.LabelValues(ctx, req) + testutil.Ok(t, err) + testutil.Assert(t, proto.Equal(req, m1.LastLabelValuesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelValuesReq) + testutil.Equals(t, []string{"1", "2", "3", "4"}, resp.Values) testutil.Equals(t, 1, len(resp.Warnings)) } @@ -1118,7 +1139,8 @@ func TestProxyStore_LabelNames(t *testing.T) { title string storeAPIs []Client - req *storepb.LabelNamesRequest + req *storepb.LabelNamesRequest + storeMatchers [][]storepb.LabelMatcher expectedNames []string expectedErr error @@ -1197,6 +1219,56 @@ func TestProxyStore_LabelNames(t *testing.T) { expectedNames: []string{"a", "b"}, expectedWarningsLen: 1, }, + { + title: "stores filtered by time range", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + minTime: timestamp.FromTime(time.Now().Add(-4 * time.Hour)), + maxTime: timestamp.FromTime(time.Now().Add(-3 * time.Hour)), + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"c", "d"}, + }, + }, + minTime: timestamp.FromTime(time.Now().Add(-2 * time.Hour)), + maxTime: timestamp.FromTime(time.Now().Add(-1 * time.Hour)), + }, + }, + req: &storepb.LabelNamesRequest{ + Start: timestamp.FromTime(time.Now().Add(-1 * time.Minute)), + End: timestamp.FromTime(time.Now()), + PartialResponseDisabled: false, + }, + expectedNames: nil, + expectedWarningsLen: 0, + }, + { + title: "store matchers specified", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespLabelNames: &storepb.LabelNamesResponse{ + Names: []string{"a", "b"}, + }, + }, + }, + }, + req: &storepb.LabelNamesRequest{ + Start: timestamp.FromTime(minTime), + End: timestamp.FromTime(maxTime), + PartialResponseDisabled: false, + }, + storeMatchers: [][]storepb.LabelMatcher{{storepb.LabelMatcher{Type: storepb.LabelMatcher_EQ, Name: "__address__", Value: "foo"}}}, + expectedNames: nil, + expectedWarningsLen: 0, + }, } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore( @@ -1209,6 +1281,9 @@ func TestProxyStore_LabelNames(t *testing.T) { ) ctx := context.Background() + if len(tc.storeMatchers) > 0 { + ctx = context.WithValue(ctx, StoreMatcherKey, tc.storeMatchers) + } resp, err := q.LabelNames(ctx, tc.req) if tc.expectedErr != nil { testutil.NotOk(t, err)