Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k211] fix: detected fields incorrect type bug #13527

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.

detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)

//TODO: detected field needs to contain the sketch
// make sure response to frontend is GRPC
//only want cardinality in JSON
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
Expand All @@ -1141,7 +1138,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
fieldCount++
}

//TODO: detected fields response needs to include the sketch
return &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: req.GetFieldLimit(),
Expand Down Expand Up @@ -1218,7 +1214,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
fieldCount := uint32(0)

for _, stream := range streams {
detectType := true
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
Expand All @@ -1241,6 +1236,7 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
df.parsers = append(df.parsers, *parser)
}

detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
if detectType {
Expand Down
82 changes: 81 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/grafana/loki/v3/pkg/logql/log"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/loghttp"

"github.com/grafana/dskit/grpcclient"
Expand Down Expand Up @@ -118,7 +120,6 @@ func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.
return (*logproto.LabelToValuesResponse)(nil), args.Error(1)
}
return res.(*logproto.LabelToValuesResponse), args.Error(1)

}

func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) {
Expand Down Expand Up @@ -517,6 +518,20 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStream(from, quantity))
}

// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIteratorWithStructuredMetadata(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStreamWithStructuredMetadata(from, quantity))
}

// mockSampleIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
Expand Down Expand Up @@ -546,6 +561,71 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}
}

func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)

// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
}

return logproto.Stream{
Entries: entries,
Labels: labels,
}
}

func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabelsAndStructuredMetadata(from, quantity, `{type="test"}`)
}

func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
{
Name: "constant",
Value: "constant",
},
}

for i := from; i < from+quantity; i++ {
metadata = append(metadata, push.LabelAdapter{
Name: "variable",
Value: fmt.Sprintf("value%d", i),
})
}

for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
StructuredMetadata: metadata,
})
}
return logproto.Stream{
Labels: labels,
Entries: entries,
}
}

type querierMock struct {
util.ExtendedMock
}
Expand Down
172 changes: 167 additions & 5 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,11 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{Key: "a", Value: "1"},
{Key: "b", Value: "2"},
}},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"}},
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"},
},
},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
Expand Down Expand Up @@ -994,7 +996,6 @@ func TestQuerier_RequestingIngesters(t *testing.T) {

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {

conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
if tc.setIngesterQueryStoreMaxLookback {
Expand Down Expand Up @@ -1175,7 +1176,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

if err != nil {
return nil, nil, nil, err
}
Expand All @@ -1191,6 +1191,7 @@ type fakeTimeLimits struct {
func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration {
return f.maxQueryLookback
}

func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration {
return f.maxQueryLength
}
Expand Down Expand Up @@ -1697,3 +1698,164 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) {
assert.NoError(b, err)
}
}

func TestQuerier_DetectedFields(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

t.Run("returns detected fields from queried logs", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 5), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label)
}
})

t.Run("correctly identifies different field types", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)

var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "message":
messageField = field
case "count":
countField = field
case "bytes":
bytesField = field
case "duration":
durationField = field
case "percent":
floatField = field
case "even":
evenField = field
}
}

assert.Equal(t, logproto.DetectedFieldString, messageField.Type)
assert.Equal(t, logproto.DetectedFieldInt, countField.Type)
assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type)
assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type)
assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type)
assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type)
})
}

func BenchmarkQuerierDetectedFields(b *testing.B) {
limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil)
ctx := user.InjectOrgID(context.Background(), "test")

conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0

request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}

store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, _ := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := querier.DetectedFields(ctx, &request)
assert.NoError(b, err)
}
}
Loading