Skip to content

Commit

Permalink
API: Add limit param in metadata APIs (thanos-io#7609)
Browse files Browse the repository at this point in the history
  • Loading branch information
harry671003 authored and jnyi committed Oct 16, 2024
1 parent ade5796 commit cd21dd7
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 6 deletions.
85 changes: 79 additions & 6 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

matcherSets, ctx, err := tenancy.RewriteLabelMatchers(ctx, r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam])
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -1093,6 +1098,10 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelValues")

hints := &storage.LabelHints{
Limit: toHintLimit(limit),
}

var (
vals []string
warnings annotations.Annotations
Expand All @@ -1101,7 +1110,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
var callWarnings annotations.Annotations
labelValuesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
vals, callWarnings, err = q.LabelValues(ctx, name, nil, matchers...)
vals, callWarnings, err = q.LabelValues(ctx, name, hints, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1117,7 +1126,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
}
sort.Strings(vals)
} else {
vals, warnings, err = q.LabelValues(ctx, name, nil)
vals, warnings, err = q.LabelValues(ctx, name, hints)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1127,6 +1136,11 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
vals = make([]string, 0)
}

if limit > 0 && len(vals) > limit {
vals = vals[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit"))
}

return vals, warnings.AsErrors(), nil, func() {}
}

Expand Down Expand Up @@ -1165,6 +1179,11 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr, func() {}
Expand All @@ -1190,18 +1209,31 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
metrics = []labels.Labels{}
sets []storage.SeriesSet
)

hints := &storage.SelectHints{
Limit: toHintLimit(limit),
Start: start.UnixMilli(),
End: end.UnixMilli(),
}

for _, mset := range matcherSets {
sets = append(sets, q.Select(ctx, false, nil, mset...))
sets = append(sets, q.Select(ctx, false, hints, mset...))
}

set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
warnings := set.Warnings()
for set.Next() {
metrics = append(metrics, set.At().Labels())
if limit > 0 && len(metrics) > limit {
metrics = metrics[:limit]
warnings.Add(errors.New("results truncated due to limit"))
return metrics, warnings.AsErrors(), nil, func() {}
}
}
if set.Err() != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: set.Err()}, func() {}
}
return metrics, set.Warnings().AsErrors(), nil, func() {}
return metrics, warnings.AsErrors(), nil, func() {}
}

func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand All @@ -1220,6 +1252,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr, func() {}
}

limit, err := parseLimitParam(r.FormValue("limit"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

matcherSets, ctx, err := tenancy.RewriteLabelMatchers(r.Context(), r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField, qapi.enforceTenancy, qapi.tenantLabel, r.Form[MatcherParam])
if err != nil {
apiErr := &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand All @@ -1246,11 +1283,15 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
warnings annotations.Annotations
)

hints := &storage.LabelHints{
Limit: toHintLimit(limit),
}

if len(matcherSets) > 0 {
var callWarnings annotations.Annotations
labelNamesSet := make(map[string]struct{})
for _, matchers := range matcherSets {
names, callWarnings, err = q.LabelNames(ctx, nil, matchers...)
names, callWarnings, err = q.LabelNames(ctx, hints, matchers...)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand All @@ -1266,7 +1307,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
}
sort.Strings(names)
} else {
names, warnings, err = q.LabelNames(ctx, nil)
names, warnings, err = q.LabelNames(ctx, hints)
}

if err != nil {
Expand All @@ -1276,6 +1317,11 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
names = make([]string, 0)
}

if limit > 0 && len(names) > limit {
names = names[:limit]
warnings = warnings.Add(errors.New("results truncated due to limit"))
}

return names, warnings.AsErrors(), nil, func() {}
}

Expand Down Expand Up @@ -1537,6 +1583,33 @@ func parseDuration(s string) (time.Duration, error) {
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}

// parseLimitParam returning 0 means no limit is to be applied.
func parseLimitParam(s string) (int, error) {
if s == "" {
return 0, nil
}

limit, err := strconv.Atoi(s)
if err != nil {
return 0, errors.Errorf("cannot parse %q to a valid limit", s)
}
if limit < 0 {
return 0, errors.New("limit must be non-negative")
}

return limit, nil
}

// toHintLimit increases the API limit, as returned by parseLimitParam, by 1.
// This allows for emitting warnings when the results are truncated.
func toHintLimit(limit int) int {
// 0 means no limit and avoid int overflow
if limit > 0 && limit < math.MaxInt {
return limit + 1
}
return limit
}

// NewMetricMetadataHandler creates handler compatible with HTTP /api/v1/metadata https://prometheus.io/docs/prometheus/latest/querying/api/#querying-metric-metadata
// which uses gRPC Unary Metadata API.
func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,15 @@ func TestMetadataEndpoints(t *testing.T) {
},
response: []string{"__name__", "foo", "replica1"},
},
// With limit
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
"limit": []string{"2"},
},
response: []string{"__name__", "foo"},
},
{
endpoint: api.labelValues,
query: url.Values{
Expand All @@ -1058,6 +1067,18 @@ func TestMetadataEndpoints(t *testing.T) {
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1", "test_metric_replica2"},
},
// With limit
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="bar"}`, `{foo="boo"}`},
"limit": []string{"3"},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1"},
},
// No matched series.
{
endpoint: api.labelValues,
Expand Down Expand Up @@ -1357,6 +1378,32 @@ func TestMetadataEndpoints(t *testing.T) {
errType: baseAPI.ErrorBadData,
method: http.MethodPost,
},
// With limit
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`{replica="", foo=~"b.+", replica1=""}`},
"limit": []string{"2"},
},
response: []labels.Labels{
labels.FromStrings("__name__", "test_metric1", "foo", "bar"),
labels.FromStrings("__name__", "test_metric1", "foo", "boo"),
},
method: http.MethodPost,
},
// Without limit
{
endpoint: api.series,
query: url.Values{
"match[]": []string{`{replica="", foo=~"b.+", replica1=""}`},
},
response: []labels.Labels{
labels.FromStrings("__name__", "test_metric1", "foo", "bar"),
labels.FromStrings("__name__", "test_metric1", "foo", "boo"),
labels.FromStrings("__name__", "test_metric2", "foo", "boo"),
},
method: http.MethodPost,
},
}

for i, test := range tests {
Expand Down Expand Up @@ -1698,6 +1745,42 @@ func TestParseStoreDebugMatchersParam(t *testing.T) {
}
}

func TestParseLimitParam(t *testing.T) {
var tests = []struct {
input string
fail bool
result int
}{
{
input: "",
fail: false,
result: 0,
}, {
input: "abc",
fail: true,
}, {
input: "10",
fail: false,
result: 10,
},
}

for _, test := range tests {
res, err := parseLimitParam(test.input)
if err != nil && !test.fail {
t.Errorf("Unexpected error for %q: %s", test.input, err)
continue
}
if err == nil && test.fail {
t.Errorf("Expected error for %q but got none", test.input)
continue
}
if !test.fail && res != test.result {
t.Errorf("Expected limit %v for input %q but got %v", test.result, test.input, res)
}
}
}

func TestRulesHandler(t *testing.T) {
twoHAgo := time.Now().Add(-2 * time.Hour)
all := []*rulespb.Rule{
Expand Down

0 comments on commit cd21dd7

Please sign in to comment.