Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct _extracted logic in detected fields #14064

Merged
merged 9 commits into from
Sep 6, 2024
119 changes: 76 additions & 43 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
parsers := getParsersFromExpr(expr)

detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers)
detectedFields := parseDetectedFields(req.FieldLimit, streams)

fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
Expand Down Expand Up @@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType {
return logproto.DetectedFieldString
}

func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields {
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
detectedFields := make(map[string]*parsedFields, limit)
fieldCount := uint32(0)
emtpyparsers := []string{}
Expand Down Expand Up @@ -1258,11 +1257,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
}
}

parsers := queryParsers
parsedLabels := getParsedLabels(entry)
if len(parsedLabels) == 0 {
parsedLabels, parsers = parseLine(entry.Line, streamLbls)
}
streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
parsedLabels, parsers := parseEntry(entry, streamLbls)
for k, vals := range parsedLabels {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
Expand Down Expand Up @@ -1299,9 +1295,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
return detectedFields
}

func getParsedLabels(entry push.Entry) map[string][]string {
func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1321,50 +1317,49 @@ func getParsedLabels(entry push.Entry) map[string][]string {
return result
}

func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.StructuredMetadata {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}
func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) {
logFmtParser := logql_log.NewLogfmtParser(false, false)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was part of the problem, we had strict turned on previously

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to only initialise the parser (call logFmtParser := logql_log.NewLogfmtParser(false, false)) when we need it? I.e. move it somewhere here https://github.com/grafana/loki/pull/14064/files#diff-a0b881d1b7b99f439716f189b50eef90ccdaea00845b087c0dbce0e148aa2c0eR1339

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, can do... missed that when I change the order to try json first


result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
origParsed := getParsedLabels(entry)
parsed := make(map[string][]string, len(origParsed))

for lbl, values := range origParsed {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel {
continue
}
result[lbl] = vals
}

return result
}
parsed[lbl] = values
}

func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) {
line := entry.Line
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)

lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
parser = "json"
jsonParser := logql_log.NewJSONParser()
lbls.Reset()
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
if !jsonSuccess || lbls.HasErr() {
return map[string][]string{}, nil
return parsed, nil
}
}

parsedLabels := map[string]map[string]struct{}{}
for _, lbl := range lbls.LabelsResult().Labels() {
// skip indexed labels, as we only want detected fields
if streamLbls.Has(lbl.Name) {
continue
for lbl, values := range parsed {
if vals, ok := parsedLabels[lbl]; ok {
for _, value := range values {
vals[value] = struct{}{}
}
} else {
parsedLabels[lbl] = map[string]struct{}{}
for _, value := range values {
parsedLabels[lbl][value] = struct{}{}
}
}
}

lblsResult := lbls.LabelsResult().Parsed()
for _, lbl := range lblsResult {
if values, ok := parsedLabels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand All @@ -1374,6 +1369,9 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st

result := make(map[string][]string, len(parsedLabels))
for lbl, values := range parsedLabels {
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || lbl == logqlmodel.PreserveErrorLabel {
continue
}
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
Expand All @@ -1384,10 +1382,29 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st
return result, []string{parser}
}

func getParsedLabels(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
}
result[lbl] = vals
}

return result
}

// streamsForFieldDetection reads the streams from the iterator and returns them sorted.
// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their
// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively.
// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels.
func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
Expand All @@ -1403,12 +1420,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward step
if lastEntry.Unix() < 0 || shouldOutput {
stream, ok := streams[streamLabels]
allLbls, err := syntax.ParseLabels(streamLabels)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic added here is to extract the stream labels from the additional labels via parsing and structured metadata. this likely needs a test

if err != nil {
continue
}

parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed)
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)

onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0)
allLbls.Range(func(l labels.Label) {
if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) {
onlyStreamLbls.Del(l.Name)
}
})

lblStr := onlyStreamLbls.LabelsResult().String()
stream, ok := streams[lblStr]
if !ok {
stream = &logproto.Stream{
Labels: streamLabels,
Labels: lblStr,
}
streams[streamLabels] = stream
streams[lblStr] = stream
}
stream.Entries = append(stream.Entries, entry)
lastEntry = i.At().Timestamp
Expand Down
Loading
Loading