Skip to content

Commit

Permalink
[connector/spanmetrics] Discard counter span metric exemplars after f…
Browse files Browse the repository at this point in the history
…lushing (#32210)

**Description:** 
Discard counter span metric exemplars after flushing to avoid unbounded
memory growth when exemplars are enabled.

This is needed because #28671 added exemplars to counter span metrics,
but they are not removed after each flush interval like they are for
histogram span metrics.

Note: this may change behaviour if using the undocumented
`exemplars.max_per_data_point` configuration option, since exemplars
would no longer be accumulated up until that count. However, i'm unclear
on the value of that feature since there's no mechanism to replace old
exemplars with newer ones once the maximum is reached. Maybe a follow-up
enhancement is only discarding exemplars once the maximum is reached, or
using a circular buffer to replace them. That could be useful for
pull-based exporters like `prometheusexporter`, as retaining exemplars
for longer would decrease the chance of them getting discarded before
being scraped.

**Link to tracking Issue:** 

Closes #31683 

**Testing:** 
- Unit tests
- Running the collector and setting a breakpoint to verify the exemplars
are being cleared in-between flushes. Before the change I could see the
exemplar count continually growing

**Documentation:** <Describe the documentation added.>
Updated the documentation to mention that exemplars are added to all
span metrics. Also mentioned when they are discarded
  • Loading branch information
swar8080 authored Apr 16, 2024
1 parent c5d1d59 commit 39bdda7
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 36 deletions.
27 changes: 27 additions & 0 deletions .chloggen/span-metric-exemplar-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Discard counter span metric exemplars after each flush interval to avoid unbounded memory growth

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31683]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This aligns exemplar discarding for counter span metrics with the existing logic for histogram span metrics

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ The following settings can be optionally configured:
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics.
- `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior).
- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.
- `exemplars`: Use to configure how to attach exemplars to metrics.
- `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
Expand Down
12 changes: 8 additions & 4 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,21 @@ func (p *connectorImp) resetState() {
p.resourceMetrics.RemoveEvictedItems()
p.metricKeyToDimensions.RemoveEvictedItems()

// If no histogram and no metrics expiration is configured, we can skip the remaining operations.
// If none of these features are enabled then we can skip the remaining operations.
// Enabling either of these features requires to go over resource metrics and do operation on each.
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 {
if p.config.Histogram.Disable && p.config.MetricsExpiration == 0 && !p.config.Exemplars.Enabled {
return
}

now := time.Now()
p.resourceMetrics.ForEach(func(k resourceKey, m *resourceMetrics) {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if !p.config.Histogram.Disable {
m.histograms.Reset(true)
if p.config.Exemplars.Enabled {
m.sums.ClearExemplars()
m.events.ClearExemplars()
if !p.config.Histogram.Disable {
m.histograms.ClearExemplars()
}
}

// If metrics expiration is configured, remove metrics that haven't been seen for longer than the expiration period.
Expand Down
114 changes: 103 additions & 11 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,18 +1519,89 @@ func TestSpanMetrics_Events(t *testing.T) {
})
}
}
func TestExemplarsForSumMetrics(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, 0, []string{})
require.NoError(t, err)
traces := buildSampleTrace()
func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) {
tests := []struct {
name string
temporality string
histogramConfig func() HistogramConfig
}{
{
name: "cumulative explicit histogram",
temporality: cumulative,
histogramConfig: explicitHistogramsConfig,
},
{
name: "cumulative exponential histogram",
temporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
},
{
name: "delta explicit histogram",
temporality: delta,
histogramConfig: explicitHistogramsConfig,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{})
p.metricsConsumer = &consumertest.MetricsSink{}
require.NoError(t, err)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
traces := ptrace.NewTraces()
trace1ID := [16]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace1ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
metrics := p.buildMetrics()
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// Verify exactly 1 exemplar is added to all data points when flushing
err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[0]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace1ID)

// Verify exemplars from previous batch's trace are replaced with exemplars for the new batch's trace
traces = ptrace.NewTraces()
trace2ID := [16]byte{0x00, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x10}
initServiceSpans(
serviceSpans{
serviceName: "service-b",
spans: []span{
{
name: "/ping",
kind: ptrace.SpanKindServer,
statusCode: ptrace.StatusCodeError,
traceID: trace2ID,
spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18},
},
},
}, traces.ResourceSpans().AppendEmpty())

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)

p.exportMetrics(ctx)
m = p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[1]
assertDataPointsHaveExactlyOneExemplarForTrace(t, m, trace2ID)
})
}
}

func assertDataPointsHaveExactlyOneExemplarForTrace(t *testing.T, metrics pmetric.Metrics, traceID pcommon.TraceID) {
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()
Expand All @@ -1539,12 +1610,33 @@ func TestExemplarsForSumMetrics(t *testing.T) {
m := ism.At(ilmC).Metrics()
for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)
if metric.Type() == pmetric.MetricTypeSum {
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
assert.Greater(t, dps.Len(), 0)
for dpi := 0; dpi < dps.Len(); dpi++ {
dp := dps.At(dpi)
assert.Greater(t, dp.Exemplars().Len(), 0)
assert.Equal(t, dp.Exemplars().Len(), 1)
assert.Equal(t, dp.Exemplars().At(0).TraceID(), traceID)
}
default:
t.Fatalf("Unexpected metric type %s", metric.Type())
}
}
}
Expand Down
30 changes: 11 additions & 19 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Key string
type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
Reset(onlyExemplars bool)
ClearExemplars()
}

type Histogram interface {
Expand Down Expand Up @@ -116,15 +116,10 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
}

func (m *explicitHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *explicitHistogramMetrics) ClearExemplars() {
for _, h := range m.metrics {
h.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*explicitHistogram)
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
Expand Down Expand Up @@ -202,15 +197,10 @@ func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetri
}
}

func (m *exponentialHistogramMetrics) Reset(onlyExemplars bool) {
if onlyExemplars {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}
return
func (m *exponentialHistogramMetrics) ClearExemplars() {
for _, m := range m.metrics {
m.exemplars = pmetric.NewExemplarSlice()
}

m.metrics = make(map[Key]*exponentialHistogram)
}

func (h *explicitHistogram) Observe(value float64) {
Expand Down Expand Up @@ -316,6 +306,8 @@ func (m *SumMetrics) BuildMetrics(
}
}

func (m *SumMetrics) Reset() {
m.metrics = make(map[Key]*Sum)
func (m *SumMetrics) ClearExemplars() {
for _, sum := range m.metrics {
sum.exemplars = pmetric.NewExemplarSlice()
}
}

0 comments on commit 39bdda7

Please sign in to comment.