Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter out stale spans from metrics generator #1612

Merged
merged 11 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [ENHANCEMENT] Add PodDisruptionBudget to ingesters in jsonnet [#1691](https://github.com/grafana/tempo/pull/1691) (@joe-elliott)
* [ENHANCEMENT] Add cli command an existing file to tempodb's current parquet schema. [#1706](https://github.com/grafana/tempo/pull/1707) (@joe-elliott)
* [ENHANCEMENT] Add query parameter to search API for traceQL queries [#1729](https://github.com/grafana/tempo/pull/1729) (@kvrhdn)
* [ENHANCEMENT] metrics-generator: filter out older spans before metrics are aggregated [#1612](https://github.com/grafana/tempo/pull/1612) (@ie-pham)
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott)
* [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott)

Expand Down
5 changes: 5 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ metrics_generator:
# https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
remote_write:
[- <Prometheus remote write config>]

# This option only allows spans with start time that occur within the configured duration to be
# considered in metrics generation
# This is to filter out spans that are outdated
[ingestion_time_range_slack: <duration> | default = 30s]
```

## Query-frontend
Expand Down
40 changes: 38 additions & 2 deletions integration/e2e/metrics_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,42 @@ func TestMetricsGenerator(t *testing.T) {
})
require.NoError(t, err)

//also send one with 5 minutes old timestamp
err = c.EmitBatch(context.Background(), &thrift.Batch{
Process: &thrift.Process{ServiceName: "app"},
Spans: []*thrift.Span{
{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: r.Int63(),
ParentSpanId: parentSpanID,
OperationName: "app-handle",
StartTime: time.Now().Add(-5 * time.Minute).UnixMicro(),
Duration: int64(1 * time.Second / time.Microsecond),
Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}},
},
},
})
require.NoError(t, err)

//also send one with timestamp 10 days in the future
err = c.EmitBatch(context.Background(), &thrift.Batch{
Process: &thrift.Process{ServiceName: "app"},
Spans: []*thrift.Span{
{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: r.Int63(),
ParentSpanId: parentSpanID,
OperationName: "app-handle",
StartTime: time.Now().Add(10 * 24 * time.Hour).UnixMicro(),
Duration: int64(1 * time.Second / time.Microsecond),
Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}},
},
},
})
require.NoError(t, err)

// Fetch metrics from Prometheus once they are received
var metricFamilies map[string]*io_prometheus_client.MetricFamily
for {
Expand Down Expand Up @@ -164,8 +200,8 @@ func TestMetricsGenerator(t *testing.T) {
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls))

// Verify metrics
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_received_total"))

assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_discarded_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total"))
Expand Down
6 changes: 6 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package generator

import (
"flag"
"time"

"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
Expand All @@ -23,6 +24,9 @@ type Config struct {
Processor ProcessorConfig `yaml:"processor"`
Registry registry.Config `yaml:"registry"`
Storage storage.Config `yaml:"storage"`
// MetricsIngestionSlack is the max amount of time passed since a span's start time
// for the span to be considered in metrics generation
MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -31,6 +35,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
}

type ProcessorConfig struct {
Expand Down
34 changes: 30 additions & 4 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

var (
Expand All @@ -44,8 +45,15 @@ var (
Name: "metrics_generator_bytes_received_total",
Help: "The total number of proto bytes received per tenant",
}, []string{"tenant"})
metricSpansDiscarded = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "metrics_generator_spans_discarded_total",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider renaming to metrics_generator_discarded_spans_total just to make it similar to this other metric: https://github.com/grafana/tempo/blob/main/modules/overrides/discarded_spans.go#L12 Not a big deal though, both should show up in grafana 🤷🏻

I think it's good to have separate metrics since a span discarded in the metrics-generator is very different from a span discarded in the ingester/compactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm should we make it similar to the other discarded span name or should we keep it similar to the other metrics in the same space?
https://github.com/grafana/tempo/blob/main/modules/generator/instance.go#L39

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see 🙃 Err, either is fine I guess? Maybe a slight preference for keeping it consistent with the other tempo_metrics_generator_ metrics then.

Naming is hard 😅

Help: "The total number of discarded spans received per tenant",
}, []string{"tenant", "reason"})
)

const reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack"

type instance struct {
cfg *Config

Expand Down Expand Up @@ -248,8 +256,7 @@ func (i *instance) updateProcessorMetrics() {
}

func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) {
i.updatePushMetrics(req)

i.preprocessSpans(req)
i.processorsMtx.RLock()
defer i.processorsMtx.RUnlock()

Expand All @@ -258,17 +265,36 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest)
}
}

func (i *instance) updatePushMetrics(req *tempopb.PushSpansRequest) {
func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) {
size := 0
spanCount := 0
expiredSpanCount := 0
for _, b := range req.Batches {
size += b.Size()
for _, ils := range b.InstrumentationLibrarySpans {
spanCount += len(ils.Spans)
// filter spans that have end time > max_age and end time more than 5 days in the future
newSpansArr := make([]*v1.Span, len(ils.Spans))
timeNow := time.Now()
index := 0
for _, span := range ils.Spans {
if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(i.cfg.MetricsIngestionSlack).UnixNano()) {
newSpansArr[index] = span
index++
} else {
expiredSpanCount++
}
}
ils.Spans = newSpansArr[0:index]
}
}
metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(size))
i.updatePushMetrics(size, spanCount, expiredSpanCount)
}

func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) {
metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested))
metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount))
metricSpansDiscarded.WithLabelValues(i.instanceID, reasonOutsideTimeRangeSlack).Add(float64(expiredSpanCount))
}

// shutdown stops the instance and flushes any remaining data. After shutdown
Expand Down