diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index d783e9d1919e..ec54a9f145cd 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -198,6 +198,10 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster if len(tokens) < 4 { return nil } + if d.metrics != nil { + d.metrics.TokensPerLine.Observe(float64(len(tokens))) + d.metrics.StatePerLine.Observe(float64(len(state.([]int)))) + } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster if matchCluster == nil { diff --git a/pkg/pattern/drain/drain_benchmark_test.go b/pkg/pattern/drain/drain_benchmark_test.go index 35ec024af138..5ef4c98b5b6f 100644 --- a/pkg/pattern/drain/drain_benchmark_test.go +++ b/pkg/pattern/drain/drain_benchmark_test.go @@ -35,11 +35,11 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) { line := scanner.Text() lines = append(lines, line) } + drain := New(DefaultConfig(), nil) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - drain := New(DefaultConfig(), nil) for _, line := range lines { drain.Train(line, 0) } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 690db7da29ee..0f9313391c1e 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -24,10 +24,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { drain *Drain inputFile string patterns []string + format string }{ { drain: New(DefaultConfig(), nil), inputFile: `testdata/agent-logfmt.txt`, + format: FormatLogfmt, patterns: []string{ `ts=2024-04-16T15:10:42.<_> level=info msg="finished node evaluation" controller_id=module.http.cloudwatch_pipelines node_id=prometheus.scrape.<_> duration=<_>.<_>`, `ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`, @@ -62,6 +64,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: `testdata/ingester-logfmt.txt`, + format: FormatLogfmt, patterns: []string{ `ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg="GET /debug/pprof/delta_mutex (200) 1.161082ms"`, `ts=2024-04-17T09:52:46.<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`, @@ -71,6 +74,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: `testdata/drone-json.txt`, + format: FormatJSON, patterns: []string{ `{"duration":<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>:<_>:<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`, `{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>:<_>:<_>"}`, @@ -83,6 +87,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/distributor-logfmt.txt", + format: FormatLogfmt, patterns: []string{ `ts=2024-05-02T12:17:22.851228301Z caller=http.go:194 level=debug traceID=1e1fe5ba1756bc38 orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=flamegraph.com%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%3D10.0.11.146%3A8001%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%3Dflamegraph-com-backend-79c858c7bf-jw2hn%2Cpod_template_hash%3D79c858c7bf%2Cpyroscope_tenant%3Dpyroscope%2Ctier%3Dbackend%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) 22.345191ms"`, `ts=2024-05-02T12:17:22.<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>.<_>"`, @@ -94,6 +99,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/journald.txt", + format: FormatUnknown, patterns: []string{ ` ln --force -s /proc/$(pidof hgrun-pause)/root/bin/hgrun /bin/hgrun;`, ` while [ "$(pidof plugins-pause)" = "" ]; do sleep 0.5; done;`, @@ -200,6 +206,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/kafka.txt", + format: FormatUnknown, patterns: []string{ `[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`, `[2024-05-07 10:55:53,038] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-1, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=447957, size=948, lastModifiedTime=1715059232052, largestRecordTimestamp=Some(1715059232002)),LogSegment(baseOffset=447969, size=948, lastModifiedTime=1715059424352, largestRecordTimestamp=Some(1715059424301)) (kafka.log.LocalLog$)`, @@ -220,6 +227,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/kubernetes.txt", + format: FormatUnknown, patterns: []string{ `I0507 12:02:27.947830 1 nodeutilization.go:274] "Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers"`, `I0507 12:02:27.<_> 1 defaultevictor.go:163] "pod does not fit on any other node because of nodeSelector(s), Taint(s), or nodes marked as unschedulable" pod="<_>/<_>"`, @@ -269,6 +277,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/vault.txt", + format: FormatUnknown, patterns: []string{ `2024-05-07T10:56:38.667Z [INFO] expiration: revoked lease: lease_id=auth/gcp/login/h4c031a99aa555040a0dd99864d828e946c6d4e31f4f5178757183def61f9d104`, `2024-05-07T10:<_>:<_>.<_> [INFO] expiration: revoked lease: lease_id=auth/kubernetes/<_>/login/<_>`, @@ -277,6 +286,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/calico.txt", + format: FormatUnknown, patterns: []string{ `2024-05-08 15:23:56.403 [DEBUG][615489] felix/table.go 699: Finished loading iptables state ipVersion=0x4 table="filter"`, `2024-05-08 15:23:56.403 [INFO][615489] felix/summary.go 100: Summarising 1 dataplane reconciliation loops over 600ms: avg=119ms longest=119ms (resync-filter-v4)`, @@ -358,6 +368,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/grafana-ruler.txt", + format: FormatLogfmt, patterns: []string{ `level=debug ts=2024-05-29T13:44:15.804597912Z caller=remote_instance_store.go:51 user=297794 slug=leanix msg="calling SaveAlertInstance"`, `level=debug ts=2024-05-29T13:44:15.<_> caller=remote_instance_store.go:51 user=396586 slug=opengov msg="calling SaveAlertInstance"`, @@ -412,10 +423,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { require.NoError(t, err) defer file.Close() + detectedFormat := false scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() tt.drain.Train(line, 0) + if !detectedFormat { + require.Equal(t, tt.format, DetectLogFormat(line)) + detectedFormat = true + } } var output []string @@ -565,7 +581,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) for _, line := range tt.inputLines { passes := matcher.Test([]byte(line)) require.Truef(t, passes, "Line should match extracted pattern: \nPatt[%q] \nLine[%q]", cluster.String(), line) - } }) } diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index b09ef1230127..344116901354 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -1,8 +1,35 @@ package drain -import "github.com/prometheus/client_golang/prometheus" +import ( + "regexp" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + FormatLogfmt = "logfmt" + FormatJSON = "json" + FormatUnknown = "unknown" +) + +var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$") + +// DetectLogFormat guesses at how the logs are encoded based on some simple heuristics. +// It only runs on the first log line when a new stream is created, so it could do some more complex parsing or regex. +func DetectLogFormat(line string) string { + if len(line) < 2 { + return FormatUnknown + } else if line[0] == '{' && line[len(line)-1] == '}' { + return FormatJSON + } else if logfmtRegex.MatchString(line) { + return FormatLogfmt + } + return FormatUnknown +} type Metrics struct { PatternsEvictedTotal prometheus.Counter PatternsDetectedTotal prometheus.Counter + TokensPerLine prometheus.Observer + StatePerLine prometheus.Observer } diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index cb5f8ae25314..f19b0373858d 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/chunk" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/spanlogger" @@ -208,7 +209,9 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream } fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger) + firstEntryLine := pushReqStream.Entries[0].Line + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) + if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index ceb5647d1fe5..94ca7e6e9791 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -7,8 +7,10 @@ import ( type ingesterMetrics struct { flushQueueLength prometheus.Gauge - patternsDiscardedTotal prometheus.Counter - patternsDetectedTotal prometheus.Counter + patternsDiscardedTotal *prometheus.CounterVec + patternsDetectedTotal *prometheus.CounterVec + tokensPerLine *prometheus.HistogramVec + statePerLine *prometheus.HistogramVec } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -19,18 +21,32 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), - patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + patternsDiscardedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_evicted_total", Help: "The total number of patterns evicted from the LRU cache.", - }), - patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + }, []string{"tenant", "format"}), + patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_detected_total", Help: "The total number of patterns detected from incoming log lines.", - }), + }, []string{"tenant", "format"}), + tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "tokens_per_line", + Help: "The number of tokens an incoming logline is split into for pattern recognition.", + Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280}, + }, []string{"tenant", "format"}), + statePerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "state_per_line", + Help: "The number of items of additional state returned alongside tokens for pattern recognition.", + Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280}, + }, []string{"tenant", "format"}), } } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index fef357eb1b40..eae5a2c22eb4 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -52,6 +52,8 @@ func newStream( chunkMetrics *metric.ChunkMetrics, cfg metric.AggregationConfig, logger log.Logger, + guessedFormat string, + instanceID string, ) (*stream, error) { stream := &stream{ fp: fp, @@ -59,8 +61,10 @@ func newStream( labelsString: labels.String(), labelHash: labels.Hash(), patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ - PatternsEvictedTotal: metrics.patternsDiscardedTotal, - PatternsDetectedTotal: metrics.patternsDetectedTotal, + PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat), + PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), + TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat), + StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat), }), cfg: cfg, logger: logger, diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index cee3df791319..65342f721785 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/pattern/metric" @@ -28,6 +29,8 @@ func TestAddStream(t *testing.T) { Enabled: false, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -65,6 +68,8 @@ func TestPruneStream(t *testing.T) { Enabled: false, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -113,6 +118,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -158,6 +165,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -244,6 +253,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err)