Skip to content

Commit

Permalink
feat: Optimize log parsing performance by using unsafe package (#13223)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jun 15, 2024
1 parent 9a99b05 commit 9f31b25
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package distributor

import (
"bytes"
"context"
"flag"
"fmt"
Expand All @@ -11,11 +12,11 @@ import (
"strings"
"time"
"unicode"
"unsafe"

"github.com/buger/jsonparser"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-logfmt/logfmt"
"github.com/gogo/status"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -45,6 +46,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util"
Expand Down Expand Up @@ -917,56 +919,57 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
}

func extractLogLevelFromLogLine(log string) string {
var v string
logSlice := unsafe.Slice(unsafe.StringData(log), len(log))
var v []byte
if isJSON(log) {
v = getValueUsingJSONParser(log)
v = getValueUsingJSONParser(logSlice)
} else {
v = getValueUsingLogfmtParser(log)
v = getValueUsingLogfmtParser(logSlice)
}

switch strings.ToLower(v) {
case "trace", "trc":
switch {
case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")):
return logLevelTrace
case "debug", "dbg":
case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")):
return logLevelDebug
case "info", "inf":
case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")):
return logLevelInfo
case "warn", "wrn":
case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")):
return logLevelWarn
case "error", "err":
case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")):
return logLevelError
case "critical":
case bytes.EqualFold(v, []byte("critical")):
return logLevelCritical
case "fatal":
case bytes.EqualFold(v, []byte("fatal")):
return logLevelFatal
default:
return detectLevelFromLogLine(log)
}
}

func getValueUsingLogfmtParser(line string) string {
equalIndex := strings.Index(line, "=")
func getValueUsingLogfmtParser(line []byte) []byte {
equalIndex := bytes.Index(line, []byte("="))
if len(line) == 0 || equalIndex == -1 {
return logLevelUnknown
return nil
}
d := logfmt.NewDecoder(strings.NewReader(line))
d.ScanRecord()
for d.ScanKeyval() {

d := logfmt.NewDecoder(line)
for !d.EOL() && d.ScanKeyval() {
if _, ok := allowedLabelsForLevel[string(d.Key())]; ok {
return string(d.Value())
return (d.Value())
}
}
return logLevelUnknown
return nil
}

func getValueUsingJSONParser(log string) string {
func getValueUsingJSONParser(log []byte) []byte {
for allowedLabel := range allowedLabelsForLevel {
l, err := jsonparser.GetString([]byte(log), allowedLabel)
l, _, _, err := jsonparser.Get(log, allowedLabel)
if err == nil {
return l
}
}
return logLevelUnknown
return nil
}

func isJSON(line string) bool {
Expand Down

0 comments on commit 9f31b25

Please sign in to comment.