Skip to content

Commit

Permalink
feat(structured-metadata-api): add structured metadata to `/detected_…
Browse files Browse the repository at this point in the history
…fields` API (#13604)
  • Loading branch information
svennergr authored Jul 22, 2024
1 parent 1008315 commit ce02cc2
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 11 deletions.
63 changes: 52 additions & 11 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package querier
import (
"context"
"flag"
"fmt"
"net/http"
"sort"
"strconv"
Expand All @@ -25,6 +24,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/iter"
Expand Down Expand Up @@ -1116,7 +1116,7 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
return nil, err
}

detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)
detectedFields := parseDetectedFields(req.FieldLimit, streams)

fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
Expand Down Expand Up @@ -1209,16 +1209,39 @@ func determineType(value string) logproto.DetectedFieldType {
return logproto.DetectedFieldString
}

func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
detectedFields := make(map[string]*parsedFields, limit)
fieldCount := uint32(0)
emtpyparser := ""

for _, stream := range streams {
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))

for _, entry := range stream.Entries {
structuredMetadata := getStructuredMetadata(entry)
for k, vals := range structuredMetadata {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
df = newParsedFields(&emtpyparser)
detectedFields[k] = df
fieldCount++
}

if df == nil {
continue
}

detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
if detectType {
// we don't want to determine the type for every line, so we assume the type in each stream will be the same, and re-detect the type for the next stream
parsedFields.DetermineType(v)
detectType = false
}

parsedFields.Insert(v)
}
}

detected, parser := parseLine(entry.Line)
for k, vals := range detected {
df, ok := detectedFields[k]
Expand Down Expand Up @@ -1247,17 +1270,35 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S

parsedFields.Insert(v)
}

level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("detected field %s with %d values", k, len(vals)))
}
}
}

return detectedFields
}

func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
}
result[lbl] = vals
}

return result
}

func parseLine(line string) (map[string][]string, *string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)
Expand Down
108 changes: 108 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,52 @@ func TestQuerier_DetectedFields(t *testing.T) {
}
})

t.Run("returns detected fields with structured metadata from queried logs", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 5), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 5)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)
expectedCardinality := map[string]uint64{
"variable": 5,
"constant": 1,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label)
}
})

t.Run("correctly identifies different field types", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Expand Down Expand Up @@ -1814,6 +1860,68 @@ func TestQuerier_DetectedFields(t *testing.T) {
assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type)
assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type)
})

t.Run("correctly identifies parser to use with logfmt and structured metadata", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)

var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "message":
messageField = field
case "count":
countField = field
case "bytes":
bytesField = field
case "duration":
durationField = field
case "percent":
floatField = field
case "even":
evenField = field
case "constant":
constantField = field
case "variable":
variableField = field
}
}

assert.Equal(t, []string{"logfmt"}, messageField.Parsers)
assert.Equal(t, []string{"logfmt"}, countField.Parsers)
assert.Equal(t, []string{"logfmt"}, bytesField.Parsers)
assert.Equal(t, []string{"logfmt"}, durationField.Parsers)
assert.Equal(t, []string{"logfmt"}, floatField.Parsers)
assert.Equal(t, []string{"logfmt"}, evenField.Parsers)
assert.Equal(t, []string{""}, constantField.Parsers)
assert.Equal(t, []string{""}, variableField.Parsers)
})
}

func BenchmarkQuerierDetectedFields(b *testing.B) {
Expand Down

0 comments on commit ce02cc2

Please sign in to comment.