From a211347da85a3adff781f2add00cda31a7f5463a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 7 Aug 2024 11:39:38 -0700 Subject: [PATCH 1/2] Metadata API: Add limit param MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + pkg/api/query/v1.go | 85 +++++++++++++++++++++++++++++++++++++--- pkg/api/query/v1_test.go | 83 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a12bcd6616..d170608828 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re * [#7592](https://github.com/thanos-io/thanos/pull/7592) Ruler: Only increment `thanos_rule_evaluation_with_warnings_total` metric for non PromQL warnings. ### Added +* [7609](https://github.com/thanos-io/thanos/pull/7609) API: Add limit param to metadata APIs (series, label names, label values). ### Changed diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index f353c75d44..7d6d85e28d 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -1067,6 +1067,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A return nil, nil, apiErr, func() {} } + limit, err := parseLimitParam(r.FormValue("limit")) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} + } + matcherSets, ctx, err := tenancy.RewriteLabelMatchers(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam]) if err != nil { apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err} @@ -1088,6 +1093,10 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A } defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelValues") + hints := &storage.LabelHints{ + Limit: toHintLimit(limit), + } + var ( vals []string warnings annotations.Annotations @@ -1096,7 +1105,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A var callWarnings annotations.Annotations labelValuesSet := make(map[string]struct{}) for _, matchers := range matcherSets { - vals, callWarnings, err = q.LabelValues(ctx, name, nil, matchers...) + vals, callWarnings, err = q.LabelValues(ctx, name, hints, matchers...) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -1112,7 +1121,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A } sort.Strings(vals) } else { - vals, warnings, err = q.LabelValues(ctx, name, nil) + vals, warnings, err = q.LabelValues(ctx, name, hints) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -1122,6 +1131,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A vals = make([]string, 0) } + if limit > 0 && len(vals) > limit { + vals = vals[:limit] + warnings = warnings.Add(errors.New("results truncated due to limit")) + } + return vals, warnings.AsErrors(), nil, func() {} } @@ -1160,6 +1174,11 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr return nil, nil, apiErr, func() {} } + limit, err := parseLimitParam(r.FormValue("limit")) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} + } + enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse) if apiErr != nil { return nil, nil, apiErr, func() {} @@ -1185,18 +1204,31 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr metrics = []labels.Labels{} sets []storage.SeriesSet ) + + hints := &storage.SelectHints{ + Limit: toHintLimit(limit), + Start: start.UnixMilli(), + End: end.UnixMilli(), + } + for _, mset := range matcherSets { - sets = append(sets, q.Select(ctx, false, nil, mset...)) + sets = append(sets, q.Select(ctx, false, hints, mset...)) } set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + warnings := set.Warnings() for set.Next() { metrics = append(metrics, set.At().Labels()) + if limit > 0 && len(metrics) > limit { + metrics = metrics[:limit] + warnings.Add(errors.New("results truncated due to limit")) + return metrics, warnings.AsErrors(), nil, func() {} + } } if set.Err() != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: set.Err()}, func() {} } - return metrics, set.Warnings().AsErrors(), nil, func() {} + return metrics, warnings.AsErrors(), nil, func() {} } func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -1215,6 +1247,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr, func() {} } + limit, err := parseLimitParam(r.FormValue("limit")) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} + } + matcherSets, ctx, err := tenancy.RewriteLabelMatchers(r.Context(), r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam]) if err != nil { apiErr := &api.ApiError{Typ: api.ErrorBadData, Err: err} @@ -1241,11 +1278,15 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap warnings annotations.Annotations ) + hints := &storage.LabelHints{ + Limit: toHintLimit(limit), + } + if len(matcherSets) > 0 { var callWarnings annotations.Annotations labelNamesSet := make(map[string]struct{}) for _, matchers := range matcherSets { - names, callWarnings, err = q.LabelNames(ctx, nil, matchers...) + names, callWarnings, err = q.LabelNames(ctx, hints, matchers...) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -1261,7 +1302,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap } sort.Strings(names) } else { - names, warnings, err = q.LabelNames(ctx, nil) + names, warnings, err = q.LabelNames(ctx, hints) } if err != nil { @@ -1271,6 +1312,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap names = make([]string, 0) } + if limit > 0 && len(names) > limit { + names = names[:limit] + warnings = warnings.Add(errors.New("results truncated due to limit")) + } + return names, warnings.AsErrors(), nil, func() {} } @@ -1532,6 +1578,33 @@ func parseDuration(s string) (time.Duration, error) { return 0, errors.Errorf("cannot parse %q to a valid duration", s) } +// parseLimitParam returning 0 means no limit is to be applied. +func parseLimitParam(s string) (int, error) { + if s == "" { + return 0, nil + } + + limit, err := strconv.Atoi(s) + if err != nil { + return 0, errors.Errorf("cannot parse %q to a valid limit", s) + } + if limit < 0 { + return 0, errors.New("limit must be non-negative") + } + + return limit, nil +} + +// toHintLimit increases the API limit, as returned by parseLimitParam, by 1. +// This allows for emitting warnings when the results are truncated. +func toHintLimit(limit int) int { + // 0 means no limit and avoid int overflow + if limit > 0 && limit < math.MaxInt { + return limit + 1 + } + return limit +} + // NewMetricMetadataHandler creates handler compatible with HTTP /api/v1/metadata https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata // which uses gRPC Unary Metadata API. func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError, func()) { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index a820abe351..5afedc5dcb 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1038,6 +1038,15 @@ func TestMetadataEndpoints(t *testing.T) { }, response: []string{"__name__", "foo", "replica1"}, }, + // With limit + { + endpoint: api.labelNames, + query: url.Values{ + "match[]": []string{`test_metric_replica2`}, + "limit": []string{"2"}, + }, + response: []string{"__name__", "foo"}, + }, { endpoint: api.labelValues, query: url.Values{ @@ -1058,6 +1067,18 @@ func TestMetadataEndpoints(t *testing.T) { }, response: []string{"test_metric1", "test_metric2", "test_metric_replica1", "test_metric_replica2"}, }, + // With limit + { + endpoint: api.labelValues, + query: url.Values{ + "match[]": []string{`{foo="bar"}`, `{foo="boo"}`}, + "limit": []string{"3"}, + }, + params: map[string]string{ + "name": "__name__", + }, + response: []string{"test_metric1", "test_metric2", "test_metric_replica1"}, + }, // No matched series. { endpoint: api.labelValues, @@ -1357,6 +1378,32 @@ func TestMetadataEndpoints(t *testing.T) { errType: baseAPI.ErrorBadData, method: http.MethodPost, }, + // With limit=2 + { + endpoint: api.series, + query: url.Values{ + "match[]": []string{`{replica="", foo=~"b.+", replica1=""}`}, + "limit": []string{"2"}, + }, + response: []labels.Labels{ + labels.FromStrings("__name__", "test_metric1", "foo", "bar"), + labels.FromStrings("__name__", "test_metric1", "foo", "boo"), + }, + method: http.MethodPost, + }, + // Without + { + endpoint: api.series, + query: url.Values{ + "match[]": []string{`{replica="", foo=~"b.+", replica1=""}`}, + }, + response: []labels.Labels{ + labels.FromStrings("__name__", "test_metric1", "foo", "bar"), + labels.FromStrings("__name__", "test_metric1", "foo", "boo"), + labels.FromStrings("__name__", "test_metric2", "foo", "boo"), + }, + method: http.MethodPost, + }, } for i, test := range tests { @@ -1698,6 +1745,42 @@ func TestParseStoreDebugMatchersParam(t *testing.T) { } } +func TestParseLimitParam(t *testing.T) { + var tests = []struct { + input string + fail bool + result int + }{ + { + input: "", + fail: false, + result: 0, + }, { + input: "abc", + fail: true, + }, { + input: "10", + fail: false, + result: 10, + }, + } + + for _, test := range tests { + res, err := parseLimitParam(test.input) + if err != nil && !test.fail { + t.Errorf("Unexpected error for %q: %s", test.input, err) + continue + } + if err == nil && test.fail { + t.Errorf("Expected error for %q but got none", test.input) + continue + } + if !test.fail && res != test.result { + t.Errorf("Expected limit %v for input %q but got %v", test.result, test.input, res) + } + } +} + func TestRulesHandler(t *testing.T) { twoHAgo := time.Now().Add(-2 * time.Hour) all := []*rulespb.Rule{ From 6b2f00fcf1a40c866e5af281b047ef23c0b9da30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Thu, 8 Aug 2024 08:29:00 -0700 Subject: [PATCH 2/2] Fix comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/api/query/v1_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 5afedc5dcb..923a9cf411 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1378,7 +1378,7 @@ func TestMetadataEndpoints(t *testing.T) { errType: baseAPI.ErrorBadData, method: http.MethodPost, }, - // With limit=2 + // With limit { endpoint: api.series, query: url.Values{ @@ -1391,7 +1391,7 @@ func TestMetadataEndpoints(t *testing.T) { }, method: http.MethodPost, }, - // Without + // Without limit { endpoint: api.series, query: url.Values{