diff --git a/CHANGELOG.md b/CHANGELOG.md index f20eb1bd105..04b1d584de0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Fixed + +- Clarify the documentation about equivalence guarantees for the `Set` and `Distinct` types in `go.opentelemetry.io/otel/attribute`. (#5027) + ### Removed - Drop support for [Go 1.20]. (#4967) diff --git a/attribute/set.go b/attribute/set.go index 108a4afd1d8..6c335a08570 100644 --- a/attribute/set.go +++ b/attribute/set.go @@ -15,15 +15,24 @@ type ( // immutable set of attributes, with an internal cache for storing // attribute encodings. // - // This type supports the Equivalent method of comparison using values of - // type Distinct. + // This type will remain comparable for backwards compatibility. The + // equivalence of Sets across versions is not guaranteed to be stable. + // Prior versions may find two Sets to be equal or not when compared + // directly (i.e. ==), but subsequent versions may not. Users should use + // the Equals method to ensure stable equivalence checking. + // + // Users should also use the Distinct returned from Equivalent as a map key + // instead of a Set directly. In addition to that type providing guarantees + // on stable equivalence, it may also provide performance improvements. Set struct { equivalent Distinct } - // Distinct wraps a variable-size array of KeyValue, constructed with keys - // in sorted order. This can be used as a map key or for equality checking - // between Sets. + // Distinct is a unique identifier of a Set. + // + // Distinct is designed to be ensures equivalence stability: comparisons + // will return the save value across versions. For this reason, Distinct + // should always be used as a map key instead of a Set. Distinct struct { iface interface{} } diff --git a/attribute/value_test.go b/attribute/value_test.go index afb98cfae8b..4e17396208e 100644 --- a/attribute/value_test.go +++ b/attribute/value_test.go @@ -95,7 +95,7 @@ func TestValue(t *testing.T) { } } -func TestSetComparability(t *testing.T) { +func TestEquivalence(t *testing.T) { pairs := [][2]attribute.KeyValue{ { attribute.Bool("Bool", true), @@ -139,12 +139,39 @@ func TestSetComparability(t *testing.T) { }, } - for _, p := range pairs { - s0, s1 := attribute.NewSet(p[0]), attribute.NewSet(p[1]) - m := map[attribute.Set]struct{}{s0: {}} - _, ok := m[s1] - assert.Truef(t, ok, "%s not comparable", p[0].Value.Type()) - } + t.Run("Distinct", func(t *testing.T) { + for _, p := range pairs { + s0, s1 := attribute.NewSet(p[0]), attribute.NewSet(p[1]) + m := map[attribute.Distinct]struct{}{s0.Equivalent(): {}} + _, ok := m[s1.Equivalent()] + assert.Truef(t, ok, "Distinct comparison of %s type: not equivalent", p[0].Value.Type()) + assert.Truef( + t, + ok, + "Distinct comparison of %s type: not equivalent: %s != %s", + p[0].Value.Type(), + s0.Encoded(attribute.DefaultEncoder()), + s1.Encoded(attribute.DefaultEncoder()), + ) + } + }) + + t.Run("Set", func(t *testing.T) { + // Maintain backwards compatibility. + for _, p := range pairs { + s0, s1 := attribute.NewSet(p[0]), attribute.NewSet(p[1]) + m := map[attribute.Set]struct{}{s0: {}} + _, ok := m[s1] + assert.Truef( + t, + ok, + "Set comparison of %s type: not equivalent: %s != %s", + p[0].Value.Type(), + s0.Encoded(attribute.DefaultEncoder()), + s1.Encoded(attribute.DefaultEncoder()), + ) + } + }) } func TestAsSlice(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 77358886388..a6629ee3125 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -30,7 +30,8 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { - res exemplar.Reservoir[N] + attrs attribute.Set + res exemplar.Reservoir[N] count uint64 min N @@ -48,7 +49,7 @@ type expoHistogramDataPoint[N int64 | float64] struct { zeroCount uint64 } -func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] { +func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] { f := math.MaxFloat64 max := N(f) // if N is int64, max will overflow to -9223372036854775808 min := N(-f) @@ -57,6 +58,7 @@ func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMa min = N(minInt64) } return &expoHistogramDataPoint[N]{ + attrs: attrs, min: max, max: min, maxSize: maxSize, @@ -289,7 +291,7 @@ func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMa newRes: r, limit: newLimiter[*expoHistogramDataPoint[N]](limit), - values: make(map[attribute.Set]*expoHistogramDataPoint[N]), + values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), start: now(), } @@ -305,7 +307,7 @@ type expoHistogram[N int64 | float64] struct { newRes func() exemplar.Reservoir[N] limit limiter[*expoHistogramDataPoint[N]] - values map[attribute.Set]*expoHistogramDataPoint[N] + values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex start time.Time @@ -323,12 +325,12 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib defer e.valuesMu.Unlock() attr := e.limit.Attributes(fltrAttr, e.values) - v, ok := e.values[attr] + v, ok := e.values[attr.Equivalent()] if !ok { - v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) v.res = e.newRes() - e.values[attr] = v + e.values[attr.Equivalent()] = v } v.record(value) v.res.Offer(ctx, t, value, droppedAttr) @@ -349,32 +351,32 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range e.values { - hDPts[i].Attributes = a + for _, val := range e.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t - hDPts[i].Count = b.count - hDPts[i].Scale = int32(b.scale) - hDPts[i].ZeroCount = b.zeroCount + hDPts[i].Count = val.count + hDPts[i].Scale = int32(val.scale) + hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 - hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) - hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) - copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) - hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) - copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts) + hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts)) + copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = b.sum + hDPts[i].Sum = val.sum } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + val.res.Collect(&hDPts[i].Exemplars) i++ } @@ -402,32 +404,32 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range e.values { - hDPts[i].Attributes = a + for _, val := range e.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t - hDPts[i].Count = b.count - hDPts[i].Scale = int32(b.scale) - hDPts[i].ZeroCount = b.zeroCount + hDPts[i].Count = val.count + hDPts[i].Scale = int32(val.scale) + hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 - hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) - hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) - copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) - hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) - copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts) + hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts)) + copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = b.sum + hDPts[i].Sum = val.sum } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + val.res.Collect(&hDPts[i].Exemplars) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index df57fad1801..bea3f771615 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -130,7 +130,7 @@ func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) { restore := withHandler(t) defer restore() - dp := newExpoHistogramDataPoint[N](tt.maxSize, 20, false, false) + dp := newExpoHistogramDataPoint[N](alice, tt.maxSize, 20, false, false) for _, v := range tt.values { dp.record(v) dp.record(-v) @@ -176,7 +176,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { for _, v := range tt.values { h.measure(context.Background(), v, alice, nil) } - dp := h.values[alice] + dp := h.values[alice.Equivalent()] assert.Equal(t, tt.expected.max, dp.max) assert.Equal(t, tt.expected.min, dp.min) @@ -218,7 +218,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { for _, v := range tt.values { h.measure(context.Background(), v, alice, nil) } - dp := h.values[alice] + dp := h.values[alice.Equivalent()] assert.Equal(t, tt.expected.max, dp.max) assert.Equal(t, tt.expected.min, dp.min) @@ -305,7 +305,7 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) { restore := withHandler(t) defer restore() - dp := newExpoHistogramDataPoint[float64](tt.maxSize, 20, false, false) + dp := newExpoHistogramDataPoint[float64](alice, tt.maxSize, 20, false, false) for _, v := range tt.values { dp.record(v) dp.record(-v) @@ -322,21 +322,21 @@ func TestExponentialHistogramDataPointRecordLimits(t *testing.T) { // These bins are calculated from the following formula: // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. - fdp := newExpoHistogramDataPoint[float64](4, 20, false, false) + fdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) fdp.record(math.MaxFloat64) if fdp.posBuckets.startBin != 1073741823 { t.Errorf("Expected startBin to be 1073741823, got %d", fdp.posBuckets.startBin) } - fdp = newExpoHistogramDataPoint[float64](4, 20, false, false) + fdp = newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) fdp.record(math.SmallestNonzeroFloat64) if fdp.posBuckets.startBin != -1126170625 { t.Errorf("Expected startBin to be -1126170625, got %d", fdp.posBuckets.startBin) } - idp := newExpoHistogramDataPoint[int64](4, 20, false, false) + idp := newExpoHistogramDataPoint[int64](alice, 4, 20, false, false) idp.record(math.MaxInt64) if idp.posBuckets.startBin != 66060287 { @@ -641,7 +641,7 @@ func TestScaleChange(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newExpoHistogramDataPoint[float64](tt.args.maxSize, 20, false, false) + p := newExpoHistogramDataPoint[float64](alice, tt.args.maxSize, 20, false, false) got := p.scaleChange(tt.args.bin, tt.args.startBin, tt.args.length) if got != tt.want { t.Errorf("scaleChange() = %v, want %v", got, tt.want) @@ -652,7 +652,7 @@ func TestScaleChange(t *testing.T) { func BenchmarkPrepend(b *testing.B) { for i := 0; i < b.N; i++ { - agg := newExpoHistogramDataPoint[float64](1024, 20, false, false) + agg := newExpoHistogramDataPoint[float64](alice, 1024, 20, false, false) n := math.MaxFloat64 for j := 0; j < 1024; j++ { agg.record(n) @@ -663,7 +663,7 @@ func BenchmarkPrepend(b *testing.B) { func BenchmarkAppend(b *testing.B) { for i := 0; i < b.N; i++ { - agg := newExpoHistogramDataPoint[float64](1024, 20, false, false) + agg := newExpoHistogramDataPoint[float64](alice, 1024, 20, false, false) n := smallestNonZeroNormalFloat64 for j := 0; j < 1024; j++ { agg.record(n) @@ -704,6 +704,7 @@ func BenchmarkExponentialHistogram(b *testing.B) { func TestSubNormal(t *testing.T) { want := &expoHistogramDataPoint[float64]{ + attrs: alice, maxSize: 4, count: 3, min: math.SmallestNonzeroFloat64, @@ -717,7 +718,7 @@ func TestSubNormal(t *testing.T) { }, } - ehdp := newExpoHistogramDataPoint[float64](4, 20, false, false) + ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) ehdp.record(math.SmallestNonzeroFloat64) ehdp.record(math.SmallestNonzeroFloat64) ehdp.record(math.SmallestNonzeroFloat64) @@ -1060,7 +1061,7 @@ func FuzzGetBin(f *testing.F) { t.Skip("skipping test for zero") } - p := newExpoHistogramDataPoint[float64](4, 20, false, false) + p := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false) // scale range is -10 to 20. p.scale = (scale%31+31)%31 - 10 got := p.getBin(v) diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 57557477f30..911d7c18691 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -16,7 +16,8 @@ import ( ) type buckets[N int64 | float64] struct { - res exemplar.Reservoir[N] + attrs attribute.Set + res exemplar.Reservoir[N] counts []uint64 count uint64 @@ -25,8 +26,8 @@ type buckets[N int64 | float64] struct { } // newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](n int) *buckets[N] { - return &buckets[N]{counts: make([]uint64, n)} +func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { + return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} } func (b *buckets[N]) sum(value N) { b.total += value } @@ -49,7 +50,7 @@ type histValues[N int64 | float64] struct { newRes func() exemplar.Reservoir[N] limit limiter[*buckets[N]] - values map[attribute.Set]*buckets[N] + values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } @@ -65,7 +66,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r bounds: b, newRes: r, limit: newLimiter[*buckets[N]](limit), - values: make(map[attribute.Set]*buckets[N]), + values: make(map[attribute.Distinct]*buckets[N]), } } @@ -85,7 +86,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute defer s.valuesMu.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - b, ok := s.values[attr] + b, ok := s.values[attr.Equivalent()] if !ok { // N+1 buckets. For example: // @@ -94,12 +95,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // Then, // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](len(s.bounds) + 1) + b = newBuckets[N](attr, len(s.bounds)+1) b.res = s.newRes() // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value - s.values[attr] = b + s.values[attr.Equivalent()] = b } b.bin(idx, value) if !s.noSum { @@ -145,24 +146,24 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range s.values { - hDPts[i].Attributes = a + for _, val := range s.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = b.count + hDPts[i].Count = val.count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = b.counts + hDPts[i].BucketCounts = val.counts if !s.noSum { - hDPts[i].Sum = b.total + hDPts[i].Sum = val.total } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + val.res.Collect(&hDPts[i].Exemplars) i++ } @@ -195,11 +196,11 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range s.values { - hDPts[i].Attributes = a + for _, val := range s.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = b.count + hDPts[i].Count = val.count hDPts[i].Bounds = bounds // The HistogramDataPoint field values returned need to be copies of @@ -207,18 +208,18 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { // // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. - hDPts[i].BucketCounts = slices.Clone(b.counts) + hDPts[i].BucketCounts = slices.Clone(val.counts) if !s.noSum { - hDPts[i].Sum = b.total + hDPts[i].Sum = val.total } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + val.res.Collect(&hDPts[i].Exemplars) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 6c1751c80c5..aeedc55d91b 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -258,7 +258,7 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](3) + b := newBuckets[N](alice, 3) assertB := func(counts []uint64, count uint64, min, max N) { t.Helper() assert.Equal(t, counts, b.counts) @@ -282,7 +282,7 @@ func TestBucketsSum(t *testing.T) { func testBucketsSum[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { - b := newBuckets[N](3) + b := newBuckets[N](alice, 3) var want N assert.Equal(t, want, b.total) @@ -325,12 +325,12 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { h.cumulative(&data) hdp := data.(metricdata.Histogram[int64]).DataPoints[0] - require.Equal(t, hdp.BucketCounts, h.values[alice].counts) + require.Equal(t, hdp.BucketCounts, h.values[alice.Equivalent()].counts) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 - assert.Equal(t, cpCounts, h.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator") + assert.Equal(t, cpCounts, h.values[alice.Equivalent()].counts, "modifying the Aggregator bucket counts should not change the Aggregator") } func TestDeltaHistogramReset(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 2b725511dda..73cf98c7599 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -15,6 +15,7 @@ import ( // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { + attrs attribute.Set timestamp time.Time value N res exemplar.Reservoir[N] @@ -24,7 +25,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), - values: make(map[attribute.Set]datapoint[N]), + values: make(map[attribute.Distinct]datapoint[N]), } } @@ -34,7 +35,7 @@ type lastValue[N int64 | float64] struct { newRes func() exemplar.Reservoir[N] limit limiter[datapoint[N]] - values map[attribute.Set]datapoint[N] + values map[attribute.Distinct]datapoint[N] } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -44,16 +45,17 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - d, ok := s.values[attr] + d, ok := s.values[attr.Equivalent()] if !ok { d.res = s.newRes() } + d.attrs = attr d.timestamp = t d.value = value d.res.Offer(ctx, t, value, droppedAttr) - s.values[attr] = d + s.values[attr.Equivalent()] = d } func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { @@ -64,8 +66,8 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { *dest = reset(*dest, n, n) var i int - for a, v := range s.values { - (*dest)[i].Attributes = a + for _, v := range s.values { + (*dest)[i].Attributes = v.attrs // The event time is the only meaningful timestamp, StartTime is // ignored. (*dest)[i].Time = v.timestamp diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go index 92f71f72dfe..9ea0251edd7 100644 --- a/sdk/metric/internal/aggregate/limit.go +++ b/sdk/metric/internal/aggregate/limit.go @@ -30,9 +30,9 @@ func newLimiter[V any](aggregation int) limiter[V] { // aggregation cardinality limit for the existing measurements. If it will, // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. -func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { if l.aggLimit > 0 { - _, exists := measurements[attrs] + _, exists := measurements[attrs.Equivalent()] if !exists && len(measurements) >= l.aggLimit-1 { return overflowSet } diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go index 5abd7f194a4..c61bae0e24f 100644 --- a/sdk/metric/internal/aggregate/limit_test.go +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -12,7 +12,7 @@ import ( ) func TestLimiterAttributes(t *testing.T) { - m := map[attribute.Set]struct{}{alice: {}} + m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} t.Run("NoLimit", func(t *testing.T) { l := newLimiter[struct{}](0) assert.Equal(t, alice, l.Attributes(alice, m)) @@ -43,7 +43,7 @@ func TestLimiterAttributes(t *testing.T) { var limitedAttr attribute.Set func BenchmarkLimiterAttributes(b *testing.B) { - m := map[attribute.Set]struct{}{alice: {}} + m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}} l := newLimiter[struct{}](2) b.ReportAllocs() diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index a3cac336539..7514b95e698 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -14,8 +14,9 @@ import ( ) type sumValue[N int64 | float64] struct { - n N - res exemplar.Reservoir[N] + n N + res exemplar.Reservoir[N] + attrs attribute.Set } // valueMap is the storage for sums. @@ -23,14 +24,14 @@ type valueMap[N int64 | float64] struct { sync.Mutex newRes func() exemplar.Reservoir[N] limit limiter[sumValue[N]] - values map[attribute.Set]sumValue[N] + values map[attribute.Distinct]sumValue[N] } func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), - values: make(map[attribute.Set]sumValue[N]), + values: make(map[attribute.Distinct]sumValue[N]), } } @@ -41,15 +42,16 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - v, ok := s.values[attr] + v, ok := s.values[attr.Equivalent()] if !ok { v.res = s.newRes() } + v.attrs = attr v.n += value v.res.Offer(ctx, t, value, droppedAttr) - s.values[attr] = v + s.values[attr.Equivalent()] = v } // newSum returns an aggregator that summarizes a set of measurements as their @@ -87,8 +89,8 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n @@ -122,8 +124,8 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - dPts[i].Attributes = attr + for _, value := range s.values { + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = value.n @@ -159,12 +161,12 @@ type precomputedSum[N int64 | float64] struct { monotonic bool start time.Time - reported map[attribute.Set]N + reported map[attribute.Distinct]N } func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { t := now() - newReported := make(map[attribute.Set]N) + newReported := make(map[attribute.Distinct]N) // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. @@ -179,16 +181,16 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - delta := value.n - s.reported[attr] + for key, value := range s.values { + delta := value.n - s.reported[key] - dPts[i].Attributes = attr + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta value.res.Collect(&dPts[i].Exemplars) - newReported[attr] = value.n + newReported[key] = value.n i++ } // Unused attribute sets do not report. @@ -219,8 +221,8 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index dfe2d6f8366..da614e62428 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1626,7 +1626,12 @@ func TestObservableExample(t *testing.T) { process1001 = attribute.NewSet(processID1001) ) - setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics, *int64, *int64, *int64) { + type observation struct { + attrs attribute.Set + value int64 + } + + setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Distinct]observation, func(*testing.T), *metricdata.ScopeMetrics, *int64, *int64, *int64) { t.Helper() const ( @@ -1648,11 +1653,11 @@ func TestObservableExample(t *testing.T) { mp := NewMeterProvider(WithReader(reader1), WithReader(reader2), WithView(noFiltered, filtered)) meter := mp.Meter(scopeName) - observations := make(map[attribute.Set]int64) + observations := make(map[attribute.Distinct]observation) _, err := meter.Int64ObservableCounter(instName, metric.WithInt64Callback( func(_ context.Context, o metric.Int64Observer) error { - for attrSet, val := range observations { - o.Observe(val, metric.WithAttributeSet(attrSet)) + for _, val := range observations { + o.Observe(val.value, metric.WithAttributeSet(val.attrs)) } return nil }, @@ -1714,8 +1719,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T0, T1]: // pid = 1001, tid = 1, #PF = 50 // pid = 1001, tid = 2, #PF = 30 - observations[thread1] = 50 - observations[thread2] = 30 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 50} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 30} *wantFiltered = 80 *wantThread1 = 50 @@ -1726,8 +1731,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T1, T2]: // pid = 1001, tid = 1, #PF = 53 // pid = 1001, tid = 2, #PF = 38 - observations[thread1] = 53 - observations[thread2] = 38 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 53} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 38} *wantFiltered = 91 *wantThread1 = 53 @@ -1738,8 +1743,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T2, T3] // pid = 1001, tid = 1, #PF = 56 // pid = 1001, tid = 2, #PF = 42 - observations[thread1] = 56 - observations[thread2] = 42 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 56} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 42} *wantFiltered = 98 *wantThread1 = 56 @@ -1750,8 +1755,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T3, T4]: // pid = 1001, tid = 1, #PF = 60 // pid = 1001, tid = 2, #PF = 47 - observations[thread1] = 60 - observations[thread2] = 47 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 60} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 47} *wantFiltered = 107 *wantThread1 = 60 @@ -1763,9 +1768,9 @@ func TestObservableExample(t *testing.T) { // thread 1 died, thread 3 started // pid = 1001, tid = 2, #PF = 53 // pid = 1001, tid = 3, #PF = 5 - delete(observations, thread1) - observations[thread2] = 53 - observations[thread3] = 5 + delete(observations, thread1.Equivalent()) + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 53} + observations[thread3.Equivalent()] = observation{attrs: thread3, value: 5} *wantFiltered = 58 want.Metrics[1].Data = metricdata.Sum[int64]{ @@ -1788,8 +1793,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T0, T1]: // pid = 1001, tid = 1, #PF = 50 // pid = 1001, tid = 2, #PF = 30 - observations[thread1] = 50 - observations[thread2] = 30 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 50} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 30} *wantFiltered = 80 *wantThread1 = 50 @@ -1800,8 +1805,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T1, T2]: // pid = 1001, tid = 1, #PF = 53 // pid = 1001, tid = 2, #PF = 38 - observations[thread1] = 53 - observations[thread2] = 38 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 53} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 38} *wantFiltered = 11 *wantThread1 = 3 @@ -1812,8 +1817,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T2, T3] // pid = 1001, tid = 1, #PF = 56 // pid = 1001, tid = 2, #PF = 42 - observations[thread1] = 56 - observations[thread2] = 42 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 56} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 42} *wantFiltered = 7 *wantThread1 = 3 @@ -1824,8 +1829,8 @@ func TestObservableExample(t *testing.T) { // During the time range (T3, T4]: // pid = 1001, tid = 1, #PF = 60 // pid = 1001, tid = 2, #PF = 47 - observations[thread1] = 60 - observations[thread2] = 47 + observations[thread1.Equivalent()] = observation{attrs: thread1, value: 60} + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 47} *wantFiltered = 9 *wantThread1 = 4 @@ -1837,9 +1842,9 @@ func TestObservableExample(t *testing.T) { // thread 1 died, thread 3 started // pid = 1001, tid = 2, #PF = 53 // pid = 1001, tid = 3, #PF = 5 - delete(observations, thread1) - observations[thread2] = 53 - observations[thread3] = 5 + delete(observations, thread1.Equivalent()) + observations[thread2.Equivalent()] = observation{attrs: thread2, value: 53} + observations[thread3.Equivalent()] = observation{attrs: thread3, value: 5} *wantFiltered = -49 want.Metrics[1].Data = metricdata.Sum[int64]{