Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Tune Patterns query drain instance #13137

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
Expand Down
24 changes: 16 additions & 8 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type IngesterQuerier struct {

ringClient *RingClient

registerer prometheus.Registerer
registerer prometheus.Registerer
ingesterQuerierMetrics *ingesterQuerierMetrics
}

func NewIngesterQuerier(
Expand All @@ -36,10 +37,11 @@ func NewIngesterQuerier(
logger log.Logger,
) (*IngesterQuerier, error) {
return &IngesterQuerier{
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
ingesterQuerierMetrics: newIngesterQuerierMetrics(registerer, metricsNamespace),
}, nil
}

Expand All @@ -63,11 +65,15 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
if err != nil {
return nil, err
}
return prunePatterns(resp, minClusterSize), nil
return prunePatterns(resp, minClusterSize, q.ingesterQuerierMetrics), nil
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig(), nil)
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
Expand All @@ -86,6 +92,8 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *lo
Samples: cluster.Samples(),
})
}
metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))
return resp
}

Expand Down
43 changes: 35 additions & 8 deletions pkg/pattern/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"os"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/logproto"
)

func Test_prunePatterns(t *testing.T) {
file, err := os.Open(`testdata/patterns.txt`)
file, err := os.Open("testdata/patterns.txt")
require.NoError(t, err)
defer file.Close()

Expand All @@ -24,15 +25,40 @@ func Test_prunePatterns(t *testing.T) {
})
}
require.NoError(t, scanner.Err())
prunePatterns(resp, 0)

startingPatterns := len(resp.Series)
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, `test`))

expectedPatterns := []string{
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=<_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=1, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=2, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=6, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_>`,
}

Expand All @@ -43,4 +69,5 @@ func Test_prunePatterns(t *testing.T) {
slices.Sort(patterns)

require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, `prunePatterns should remove duplicates`)
}
22 changes: 22 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,25 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
}),
}
}

type ingesterQuerierMetrics struct {
patternsPrunedTotal prometheus.Counter
patternsRetainedTotal prometheus.Counter
}

func newIngesterQuerierMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterQuerierMetrics {
return &ingesterQuerierMetrics{
patternsPrunedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_pruned_total",
Help: "The total number of patterns removed at query time by the pruning Drain instance",
}),
patternsRetainedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_retained_total",
Help: "The total number of patterns retained at query time by the pruning Drain instance",
}),
}
}
Loading