Skip to content

Commit

Permalink
API: Expose optional label matcher for label names API
Browse files Browse the repository at this point in the history
This is an enhancement to add the optional matchers on the label names API
  • Loading branch information
yuri-rs committed May 27, 2024
1 parent 4901a5c commit bd38e23
Show file tree
Hide file tree
Showing 20 changed files with 322 additions and 207 deletions.
1 change: 1 addition & 0 deletions docs/sources/reference/loki-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ It accepts the following query parameters in the URL:
- `start`: The start time for the query as a nanosecond Unix epoch. Defaults to 6 hours ago.
- `end`: The end time for the query as a nanosecond Unix epoch. Defaults to now.
- `since`: A `duration` used to calculate `start` relative to `end`. If `end` is in the future, `start` is calculated as this duration before now. Any value specified for `start` supersedes this parameter.
- `query`: A set of log stream selector that selects the streams to match and return label names. Example: `{"app": "myapp", "environment": "dev"}`

In microservices mode, `/loki/api/v1/labels` is exposed by the querier.

Expand Down
19 changes: 17 additions & 2 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,22 @@ func (g *Gateway) LabelNamesForMetricName(ctx context.Context, req *logproto.Lab
if err != nil {
return nil, err
}
names, err := g.indexQuerier.LabelNamesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName)
var matchers []*labels.Matcher
// An empty matchers string cannot be parsed,
// therefore we check the string representation of the matchers.
if req.Matchers != syntax.EmptyMatchers {
expr, err := syntax.ParseExprWithoutValidation(req.Matchers)
if err != nil {
return nil, err
}

matcherExpr, ok := expr.(*syntax.MatchersExpr)
if !ok {
return nil, fmt.Errorf("invalid label matchers found of type %T", expr)
}
matchers = matcherExpr.Mts
}
names, err := g.indexQuerier.LabelNamesForMetricName(ctx, instanceID, req.From, req.Through, req.MetricName, matchers...)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +323,7 @@ func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *logproto.La
}
var matchers []*labels.Matcher
// An empty matchers string cannot be parsed,
// therefore we check the string representation of the the matchers.
// therefore we check the string representation of the matchers.
if req.Matchers != syntax.EmptyMatchers {
expr, err := syntax.ParseExprWithoutValidation(req.Matchers)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}
} else {
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs")
storeValues, err = cs.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (s *mockStore) LabelValuesForMetricName(_ context.Context, _ string, _, _ m
return []string{"val1", "val2"}, nil
}

func (s *mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (s *mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
}

Expand Down
395 changes: 226 additions & 169 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ message LabelNamesForMetricNameRequest {
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
string matchers = 4;
}

// TODO(owen-d): fix. This will break rollouts as soon as the internal repr is changed.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name, matchers...)
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs", matchers...)
}
return err
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *storeMock) LabelValuesForMetricName(ctx context.Context, userID string,
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, _ ...*labels.Matcher) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName)
return args.Get(0).([]string), args.Error(1)
}
Expand Down
40 changes: 33 additions & 7 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
}, false},
{"labels", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/label?start=%d&end=%d`, start.UnixNano(), end.UnixNano()), nil)
}, NewLabelRequest(start, end, "", "", "/label"),
fmt.Sprintf(`/label?start=%d&end=%d&query={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
}, NewLabelRequest(start, end, `{foo="bar"}`, "", "/label"),
false},
{"label_values", func() (*http.Request, error) {
req, err := http.NewRequest(http.MethodGet,
Expand Down Expand Up @@ -875,22 +875,26 @@ func Test_codec_series_EncodeRequest(t *testing.T) {

func Test_codec_labels_EncodeRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
toEncode := NewLabelRequest(start, end, "", "", "/loki/api/v1/labels")

// Test labels endpoint
toEncode := NewLabelRequest(start, end, `{foo="bar"}`, "", "/loki/api/v1/labels")
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// testing a full roundtrip
req, err := DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.Start, req.(*LabelRequest).Start)
require.Equal(t, toEncode.End, req.(*LabelRequest).End)
require.Equal(t, toEncode.Query, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/labels", req.(*LabelRequest).Path())

// Test labels values endpoint
// Test label values endpoint
toEncode = NewLabelRequest(start, end, `{foo="bar"}`, "__name__", "/loki/api/v1/label/__name__/values")
got, err = DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
Expand All @@ -912,21 +916,43 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {

func Test_codec_labels_DecodeRequest(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
u, err := url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)

// Test labels endpoint
u, err := url.Parse(`/loki/api/v1/labels?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r := &http.Request{URL: u}
r = mux.SetURLVars(r, map[string]string{"name": "__name__"})
req, err := DefaultCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, *req.(*LabelRequest).Start)
require.Equal(t, end, *req.(*LabelRequest).End)
require.Equal(t, `{foo="bar"}`, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/label/__name__/values", req.(*LabelRequest).Path())
require.Equal(t, "/loki/api/v1/labels", req.(*LabelRequest).Path())

got, err := DefaultCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))

// Test label values endpoint
u, err = url.Parse(`/loki/api/v1/label/__name__/values?start=1575285010000000010&end=1575288610000000010&query={foo="bar"}`)
require.NoError(t, err)

r = &http.Request{URL: u}
r = mux.SetURLVars(r, map[string]string{"name": "__name__"})
req, err = DefaultCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, *req.(*LabelRequest).Start)
require.Equal(t, end, *req.(*LabelRequest).End)
require.Equal(t, `{foo="bar"}`, req.(*LabelRequest).Query)
require.Equal(t, "/loki/api/v1/label/__name__/values", req.(*LabelRequest).Path())

got, err = DefaultCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/label/__name__/values", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/labels_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r r
return fmt.Sprintf("labelvalues:%s:%s:%s:%d:%d", userID, lr.GetName(), lr.GetQuery(), currentInterval, split)
}

return fmt.Sprintf("labels:%s:%d:%d", userID, currentInterval, split)
return fmt.Sprintf("labels:%s:%s:%d:%d", userID, lr.GetQuery(), currentInterval, split)
}

type labelsExtractor struct{}
Expand Down
16 changes: 10 additions & 6 deletions pkg/querier/queryrange/labels_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ func TestCacheKeyLabels_GenerateCacheKey(t *testing.T) {
expectedInterval := testTime.UnixMilli() / time.Hour.Milliseconds()

t.Run("labels", func(t *testing.T) {
require.Equal(t, fmt.Sprintf(`labels:fake:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))
require.Equal(t, fmt.Sprintf(`labels:fake::%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))

req.Query = `{cluster="eu-west1"}`
require.Equal(t, fmt.Sprintf(`labels:fake:{cluster="eu-west1"}:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))
})

t.Run("label values", func(t *testing.T) {
req := req
req.Name = "foo"
req.Values = true
req.Query = ``
require.Equal(t, fmt.Sprintf(`labelvalues:fake:foo::%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req))

req.Query = `{cluster="eu-west1"}`
Expand Down Expand Up @@ -361,20 +365,20 @@ func TestLabelQueryCacheKey(t *testing.T) {
t.Run(fmt.Sprintf("%s (values: %v)", tc.name, values), func(t *testing.T) {
keyGen := cacheKeyLabels{tc.limits, nil}

const labelName = "foo"
const query = `{cluster="eu-west1"}`

r := &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &tc.start,
End: &tc.end,
Query: query,
},
}

const labelName = "foo"
const query = `{cluster="eu-west1"}`

if values {
r.LabelRequest.Values = true
r.LabelRequest.Name = labelName
r.LabelRequest.Query = query
}

// we use regex here because cache key always refers to the current time to get the ingester query window,
Expand All @@ -383,7 +387,7 @@ func TestLabelQueryCacheKey(t *testing.T) {
if values {
pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit))
} else {
pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tenantID, tc.expectedSplit))
pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:%s:(\d+):%d`, tenantID, regexp.QuoteMeta(query), tc.expectedSplit))
}

require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tenantID, r))
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, userID str
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
var result util.UniqueStrings
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName)
labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName, matchers...)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand All @@ -122,7 +122,7 @@ func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string,
}
level.Debug(log).Log("metric", metricName)

return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName)
return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
}

func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m mockStore) GetSeries(_ context.Context, _ string, _, _ model.Time, _ ...
return nil, nil
}

func (m mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (m mockStore) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
}

Expand Down Expand Up @@ -210,7 +210,7 @@ func (m mockStoreLabel) LabelValuesForMetricName(_ context.Context, _ string, _,
return m.values, nil
}

func (m mockStoreLabel) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string) ([]string, error) {
func (m mockStoreLabel) LabelNamesForMetricName(_ context.Context, _ string, _, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
return m.values, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Filterable interface {
type BaseReader interface {
GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)
LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error)
}

type StatsReader interface {
Expand Down Expand Up @@ -112,11 +112,11 @@ func (m MonitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use
return values, nil
}

func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
var values []string
if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName)
values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
return err
}); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, _ string, from,
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, metricName string) ([]string, error) {
func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
resp, err := c.client.LabelNamesForMetricName(ctx, &logproto.LabelNamesForMetricNameRequest{
MetricName: metricName,
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()

// Fetch the series IDs from the index
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, nil)
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers)
if err != nil {
return nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/stores/series/series_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,24 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
for _, tc := range []struct {
metricName string
expect []string
matchers []*labels.Matcher
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
nil,
},
{
`bar`,
[]string{"bar", "toms"},
nil,
},
{
`foo`,
[]string{"bar", "toms"},
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "bar", "beep"),
},
},
} {
for _, schema := range schemas {
Expand Down Expand Up @@ -286,23 +296,23 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
}

// Query with ordinary time-range
labelNames1, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now, tc.metricName)
labelNames1, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now, tc.metricName, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames1) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames1))
}

// Pushing end of time-range into future should yield exact same resultset
labelNames2, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName)
labelNames2, err := store.LabelNamesForMetricName(ctx, userID, now.Add(-time.Hour), now.Add(time.Hour*24*10), tc.metricName, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, labelNames2) {
t.Fatalf("%s: wrong label name - %s", tc.metricName, test.Diff(tc.expect, labelNames2))
}

// Query with both begin & end of time-range in future should yield empty resultset
labelNames3, err := store.LabelNamesForMetricName(ctx, userID, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName)
labelNames3, err := store.LabelNamesForMetricName(ctx, userID, now.Add(time.Hour), now.Add(time.Hour*2), tc.metricName, tc.matchers...)
require.NoError(t, err)
if len(labelNames3) != 0 {
t.Fatalf("%s: future query should yield empty resultset ... actually got %v label names: %#v",
Expand Down
Loading

0 comments on commit bd38e23

Please sign in to comment.