Skip to content

Commit

Permalink
fix: detected fields incorrect type bug (#13515)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Jul 15, 2024
1 parent e506995 commit f6a94d3
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 11 deletions.
6 changes: 1 addition & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.

detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)

//TODO: detected field needs to contain the sketch
// make sure response to frontend is GRPC
//only want cardinality in JSON
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
Expand All @@ -1141,7 +1138,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
fieldCount++
}

//TODO: detected fields response needs to include the sketch
return &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: req.GetFieldLimit(),
Expand Down Expand Up @@ -1218,7 +1214,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
fieldCount := uint32(0)

for _, stream := range streams {
detectType := true
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
Expand All @@ -1241,6 +1236,7 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
df.parsers = append(df.parsers, *parser)
}

detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
if detectType {
Expand Down
82 changes: 81 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/grafana/loki/v3/pkg/logql/log"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/loghttp"

"github.com/grafana/dskit/grpcclient"
Expand Down Expand Up @@ -118,7 +120,6 @@ func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.
return (*logproto.LabelToValuesResponse)(nil), args.Error(1)
}
return res.(*logproto.LabelToValuesResponse), args.Error(1)

}

func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) {
Expand Down Expand Up @@ -517,6 +518,20 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIteratorWithStructuredMetadata(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStreamWithStructuredMetadata(from, quantity))
}

// mockSampleIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
Expand Down Expand Up @@ -546,6 +561,71 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}
}

func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)

// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
}

return logproto.Stream{
Entries: entries,
Labels: labels,
}
}

func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabelsAndStructuredMetadata(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
{
Name: "constant",
Value: "constant",
},
}

for i := from; i < from+quantity; i++ {
metadata = append(metadata, push.LabelAdapter{
Name: "variable",
Value: fmt.Sprintf("value%d", i),
})
}

for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
StructuredMetadata: metadata,
})
}
return logproto.Stream{
Labels: labels,
Entries: entries,
}
}

type querierMock struct {
util.ExtendedMock
}
Expand Down
172 changes: 167 additions & 5 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,11 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{Key: "a", Value: "1"},
{Key: "b", Value: "2"},
}},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"}},
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"},
},
},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
Expand Down Expand Up @@ -994,7 +996,6 @@ func TestQuerier_RequestingIngesters(t *testing.T) {

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {

conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
if tc.setIngesterQueryStoreMaxLookback {
Expand Down Expand Up @@ -1175,7 +1176,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

if err != nil {
return nil, nil, nil, err
}
Expand All @@ -1191,6 +1191,7 @@ type fakeTimeLimits struct {
func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration {
return f.maxQueryLookback
}

func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration {
return f.maxQueryLength
}
Expand Down Expand Up @@ -1697,3 +1698,164 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) {
assert.NoError(b, err)
}
}

func TestQuerier_DetectedFields(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

t.Run("returns detected fields from queried logs", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 5), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label)
}
})

t.Run("correctly identifies different field types", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)

var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "message":
messageField = field
case "count":
countField = field
case "bytes":
bytesField = field
case "duration":
durationField = field
case "percent":
floatField = field
case "even":
evenField = field
}
}

assert.Equal(t, logproto.DetectedFieldString, messageField.Type)
assert.Equal(t, logproto.DetectedFieldInt, countField.Type)
assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type)
assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type)
assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type)
assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type)
})
}

func BenchmarkQuerierDetectedFields(b *testing.B) {
limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, _ := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := querier.DetectedFields(ctx, &request)
assert.NoError(b, err)
}
}

0 comments on commit f6a94d3

Please sign in to comment.