Skip to content

Commit

Permalink
feat: Detected labels from store (#12441)
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi authored Apr 23, 2024
1 parent 904ef6e commit 587a6d2
Show file tree
Hide file tree
Showing 14 changed files with 714 additions and 83 deletions.
1 change: 0 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,6 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected
if err != nil {
return nil, err
}
level.Info(i.logger).Log("msg", matchers)
}

labelMap, err := instance.LabelsWithValues(ctx, *req.Start, matchers...)
Expand Down
123 changes: 123 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,129 @@ func Test_InMemoryLabels(t *testing.T) {
require.Equal(t, []string{"bar", "foo"}, res.Values)
}

func TestIngester_GetDetectedLabels(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Push labels
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
{
Labels: `{foo="bar1",bar="baz3"}`,
},
{
Labels: `{foo="foo1",bar="baz1"}`,
},
{
Labels: `{foo="foo",bar="baz1"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Query: "",
})

require.NoError(t, err)
fooValues, ok := res.Labels["foo"]
require.True(t, ok)
barValues, ok := res.Labels["bar"]
require.True(t, ok)
require.Equal(t, 4, len(fooValues.Values))
require.Equal(t, 3, len(barValues.Values))
}

func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Push labels
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
{
Labels: `{foo="bar1",bar="baz3"}`,
},
{
Labels: `{foo="foo1",bar="baz4"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Query: `{foo="bar"}`,
})

require.NoError(t, err)
fooValues, ok := res.Labels["foo"]
require.True(t, ok)
barValues, ok := res.Labels["bar"]
require.True(t, ok)
require.Equal(t, 1, len(fooValues.Values))
require.Equal(t, 2, len(barValues.Values))
}

func Test_DedupeIngester(t *testing.T) {
var (
requests = int64(400)
Expand Down
26 changes: 24 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,31 @@ type UniqueValues map[string]struct{}

// LabelsWithValues returns the label names with all the unique values depending on the request
func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, matchers ...*labels.Matcher) (map[string]UniqueValues, error) {
// TODO (shantanu): Figure out how to get the label names from index directly when no matchers are given.

labelMap := make(map[string]UniqueValues)
if len(matchers) == 0 {
labelsFromIndex, err := i.index.LabelNames(startTime, nil)
if err != nil {
return nil, err
}

for _, label := range labelsFromIndex {
values, err := i.index.LabelValues(startTime, label, nil)
if err != nil {
return nil, err
}
existingValues, exists := labelMap[label]
if !exists {
existingValues = make(map[string]struct{})
}
for _, v := range values {
existingValues[v] = struct{}{}
}
labelMap[label] = existingValues
}

return labelMap, nil
}

err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error {
for _, label := range s.labels {
v, exists := labelMap[label.Name]
Expand Down
49 changes: 49 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,55 @@ func insertData(t *testing.T, instance *instance) {
}
}

func TestInstance_LabelsWithValues(t *testing.T) {
instance, currentTime, _ := setupTestStreams(t)
start := []time.Time{currentTime.Add(11 * time.Nanosecond)}[0]
m, err := labels.NewMatcher(labels.MatchEqual, "app", "test")
require.NoError(t, err)

t.Run("label names with no matchers returns all detected labels", func(t *testing.T) {
var matchers []*labels.Matcher
res, err := instance.LabelsWithValues(context.Background(), start, matchers...)
completeResponse := map[string]UniqueValues{
"app": map[string]struct{}{
"test": {},
"test2": {},
},
"job": map[string]struct{}{
"varlogs": {},
"varlogs2": {},
},
}
require.NoError(t, err)
require.Equal(t, completeResponse, res)
})

t.Run("label names with matcher returns response with matching detected labels", func(t *testing.T) {
matchers := []*labels.Matcher{m}
res, err := instance.LabelsWithValues(context.Background(), start, matchers...)
responseWithMatchingLabel := map[string]UniqueValues{
"app": map[string]struct{}{
"test": {},
},
"job": map[string]struct{}{
"varlogs": {},
"varlogs2": {},
},
}
require.NoError(t, err)
require.Equal(t, responseWithMatchingLabel, res)
})

t.Run("label names matchers and no start time returns a empty response", func(t *testing.T) {
matchers := []*labels.Matcher{m}
var st time.Time
res, err := instance.LabelsWithValues(context.Background(), st, matchers...)

require.NoError(t, err)
require.Equal(t, map[string]UniqueValues{}, res)
})
}

type fakeQueryServer func(*logproto.QueryResponse) error

func (f fakeQueryServer) Send(res *logproto.QueryResponse) error {
Expand Down
42 changes: 40 additions & 2 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,44 @@ func extractShard(shards []string) *astmapper.ShardAnnotation {
return &shard
}

func RecordDetectedLabelsQueryMetrics(_ context.Context, _ log.Logger, _ time.Time, _ time.Time, _ string, _ string, _ logql_stats.Result) {
// TODO(shantanu) log metrics here
func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start time.Time, end time.Time, query string, status string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeVolume
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

rangeType := "range"

level.Info(logger).Log(
"api", "detected_labels",
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"range_type", rangeType,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"splits", stats.Summary.Splits,
"total_entries", stats.Summary.TotalEntriesReturned,
// cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats
//"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
//"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
//"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored,
//"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
//"cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(),
)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,8 @@ func (t *Loki) setupAsyncStore() error {
}

func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace)
logger := log.With(util_log.Logger, "component", "querier")
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger)
if err != nil {
return nil, err
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
Expand Down Expand Up @@ -41,23 +43,25 @@ type IngesterQuerier struct {
ring ring.ReadRing
pool *ring_client.Pool
extraQueryDelay time.Duration
logger log.Logger
}

func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string) (*IngesterQuerier, error) {
func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace)
return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
}

// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string) (*IngesterQuerier, error) {
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
extraQueryDelay: extraQueryDelay,
logger: logger,
}

err := services.StartAndAwaitRunning(context.Background(), iq.pool)
Expand Down Expand Up @@ -364,12 +368,17 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec
})

if err != nil {
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
return nil, err
}

labelMap := make(map[string][]string)
for _, resp := range ingesterResponses {
thisIngester := resp.response.(*logproto.LabelToValuesResponse)
thisIngester, ok := resp.response.(*logproto.LabelToValuesResponse)
if !ok {
level.Warn(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels",
"response", resp)
}

for label, thisIngesterValues := range thisIngester.Labels {
var combinedValues []string
Expand Down
Loading

0 comments on commit 587a6d2

Please sign in to comment.