From ac83b746da5e1f01ab1f395f85d258d2a918e19b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 19 Sep 2024 12:13:43 -0400 Subject: [PATCH] Make the collector exporter mutate data, and remove unnecessary CopyTo (#892) * make the collector exporter mutate data, and remove unnecessary CopyTo * update comments * rebase on benchmark --- .../internal/normalization/benchmark_test.go | 16 +- .../normalization/disabled_normalizer.go | 52 ++--- .../normalization/standard_normalizer.go | 179 ++++++++---------- .../collector/internal/normalization/types.go | 8 +- exporter/collector/logs.go | 6 +- exporter/collector/metrics.go | 19 +- 6 files changed, 114 insertions(+), 166 deletions(-) diff --git a/exporter/collector/internal/normalization/benchmark_test.go b/exporter/collector/internal/normalization/benchmark_test.go index 5c7c3f2d..86b3fc25 100644 --- a/exporter/collector/internal/normalization/benchmark_test.go +++ b/exporter/collector/internal/normalization/benchmark_test.go @@ -40,7 +40,7 @@ func BenchmarkNormalizeNumberDataPoint(b *testing.B) { b.StopTimer() newPoint := testNumberDataPoint() b.StartTimer() - _, ok = normalizer.NormalizeNumberDataPoint(newPoint, id) + ok = normalizer.NormalizeNumberDataPoint(newPoint, id) assert.True(b, ok) } } @@ -61,7 +61,7 @@ func BenchmarkNormalizeHistogramDataPoint(b *testing.B) { b.StopTimer() newPoint := testHistogramDataPoint() b.StartTimer() - _, ok = normalizer.NormalizeHistogramDataPoint(newPoint, id) + ok = normalizer.NormalizeHistogramDataPoint(newPoint, id) assert.True(b, ok) } } @@ -82,7 +82,7 @@ func BenchmarkNormalizeExopnentialHistogramDataPoint(b *testing.B) { b.StopTimer() newPoint := testExponentialHistogramDataPoint() b.StartTimer() - _, ok = normalizer.NormalizeExponentialHistogramDataPoint(newPoint, id) + ok = normalizer.NormalizeExponentialHistogramDataPoint(newPoint, id) assert.True(b, ok) } } @@ -103,7 +103,7 @@ func BenchmarkNormalizeSummaryDataPoint(b *testing.B) { b.StopTimer() newPoint := testSummaryDataPoint() b.StartTimer() - _, ok = normalizer.NormalizeSummaryDataPoint(newPoint, id) + ok = normalizer.NormalizeSummaryDataPoint(newPoint, id) assert.True(b, ok) } } @@ -127,7 +127,7 @@ func BenchmarkResetNormalizeNumberDataPoint(b *testing.B) { newPoint := testNumberDataPoint() newPoint.SetIntValue(int64(b.N - i)) b.StartTimer() - _, ok = normalizer.NormalizeNumberDataPoint(newPoint, id) + ok = normalizer.NormalizeNumberDataPoint(newPoint, id) assert.True(b, ok) } } @@ -151,7 +151,7 @@ func BenchmarkResetNormalizeHistogramDataPoint(b *testing.B) { newPoint := testHistogramDataPoint() newPoint.SetSum(float64(b.N - i)) b.StartTimer() - _, ok = normalizer.NormalizeHistogramDataPoint(newPoint, id) + ok = normalizer.NormalizeHistogramDataPoint(newPoint, id) assert.True(b, ok) } } @@ -175,7 +175,7 @@ func BenchmarkResetNormalizeExponentialHistogramDataPoint(b *testing.B) { newPoint := testExponentialHistogramDataPoint() newPoint.SetSum(float64(b.N - i)) b.StartTimer() - _, ok = normalizer.NormalizeExponentialHistogramDataPoint(newPoint, id) + ok = normalizer.NormalizeExponentialHistogramDataPoint(newPoint, id) assert.True(b, ok) } } @@ -199,7 +199,7 @@ func BenchmarkResetNormalizeSummaryDataPoint(b *testing.B) { newPoint := testSummaryDataPoint() newPoint.SetSum(float64(b.N - i)) b.StartTimer() - _, ok = normalizer.NormalizeSummaryDataPoint(newPoint, id) + ok = normalizer.NormalizeSummaryDataPoint(newPoint, id) assert.True(b, ok) } } diff --git a/exporter/collector/internal/normalization/disabled_normalizer.go b/exporter/collector/internal/normalization/disabled_normalizer.go index 310fe30f..3589105f 100644 --- a/exporter/collector/internal/normalization/disabled_normalizer.go +++ b/exporter/collector/internal/normalization/disabled_normalizer.go @@ -33,58 +33,46 @@ func NewDisabledNormalizer() Normalizer { type disabledNormalizer struct{} -// NormalizeExponentialHistogramDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, _ uint64) (pmetric.ExponentialHistogramDataPoint, bool) { +// NormalizeExponentialHistogramDataPoint ensures the start time is before the +// end time, but does not normalize points. +func (d *disabledNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, _ uint64) bool { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. - // Make a copy so we don't mutate underlying data. - newPoint := pmetric.NewExponentialHistogramDataPoint() - point.CopyTo(newPoint) // StartTime = Timestamp - 1 ms - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - return newPoint, true + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) } - return point, true + return true } -// NormalizeHistogramDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, _ uint64) (pmetric.HistogramDataPoint, bool) { +// NormalizeHistogramDataPoint ensures the start time is before the +// end time, but does not normalize points. +func (d *disabledNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, _ uint64) bool { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. - // Make a copy so we don't mutate underlying data. - newPoint := pmetric.NewHistogramDataPoint() - point.CopyTo(newPoint) // StartTime = Timestamp - 1 ms - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - return newPoint, true + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) } - return point, true + return true } -// NormalizeNumberDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, _ uint64) (pmetric.NumberDataPoint, bool) { +// NormalizeNumberDataPoint ensures the start time is before the +// end time, but does not normalize points. +func (d *disabledNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, _ uint64) bool { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. - // Make a copy so we don't mutate underlying data. - newPoint := pmetric.NewNumberDataPoint() - point.CopyTo(newPoint) // StartTime = Timestamp - 1 ms - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - return newPoint, true + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) } - return point, true + return true } -// NormalizeSummaryDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, _ uint64) (pmetric.SummaryDataPoint, bool) { +// NormalizeSummaryDataPoint ensures the start time is before the +// end time, but does not normalize points. +func (d *disabledNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, _ uint64) bool { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. - // Make a copy so we don't mutate underlying data. - newPoint := pmetric.NewSummaryDataPoint() - point.CopyTo(newPoint) // StartTime = Timestamp - 1 ms - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - return newPoint, true + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) } - return point, true + return true } diff --git a/exporter/collector/internal/normalization/standard_normalizer.go b/exporter/collector/internal/normalization/standard_normalizer.go index 155ca194..e249aa9c 100644 --- a/exporter/collector/internal/normalization/standard_normalizer.go +++ b/exporter/collector/internal/normalization/standard_normalizer.go @@ -47,7 +47,7 @@ type standardNormalizer struct { log *zap.Logger } -func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) { +func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) bool { start, hasStart := s.startCache.GetExponentialHistogramDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -57,10 +57,10 @@ func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetri // Record it in history and drop the point. s.startCache.SetExponentialHistogramDataPoint(identifier, point) s.previousCache.SetExponentialHistogramDataPoint(identifier, point) - return pmetric.ExponentialHistogramDataPoint{}, false + return false } // No normalization required, since we haven't cached anything, and the start TS is non-zero. - return point, true + return true } // TODO(#366): It is possible, but difficult to compare exponential @@ -70,7 +70,7 @@ func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetri if point.Scale() != start.Scale() { s.startCache.SetExponentialHistogramDataPoint(identifier, point) s.previousCache.SetExponentialHistogramDataPoint(identifier, point) - return pmetric.ExponentialHistogramDataPoint{}, false + return false } previous, hasPrevious := s.previousCache.GetExponentialHistogramDataPoint(identifier) @@ -81,22 +81,19 @@ func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetri } if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) || (point.StartTimestamp() == 0 && lessThanExponentialHistogramDataPoint(point, previous)) { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewExponentialHistogramDataPoint() // This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point. // Assume the reset occurred at T - 1 ms, and leave the value untouched. - point.CopyTo(newPoint) - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - s.previousCache.SetExponentialHistogramDataPoint(identifier, newPoint) + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) + s.previousCache.SetExponentialHistogramDataPoint(identifier, point) // For subsequent points, we don't want to modify the value, but we do // want to make the start timestamp match the point we write here. // Store a point with the same timestamps, but zero value to achieve // that behavior. zeroPoint := pmetric.NewExponentialHistogramDataPoint() - zeroPoint.SetTimestamp(newPoint.StartTimestamp()) - zeroPoint.SetScale(newPoint.Scale()) + zeroPoint.SetTimestamp(point.StartTimestamp()) + zeroPoint.SetScale(point.Scale()) s.startCache.SetExponentialHistogramDataPoint(identifier, zeroPoint) - return newPoint, true + return true } if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) { // We found a cached start timestamp that wouldn't produce a valid point. @@ -106,12 +103,12 @@ func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetri zap.String("lastRecordedReset", start.Timestamp().String()), zap.String("dataPoint", point.Timestamp().String()), ) - return pmetric.ExponentialHistogramDataPoint{}, false + return false } // There was no reset, so normalize the point against the start point - newPoint := subtractExponentialHistogramDataPoint(point, start) - s.previousCache.SetExponentialHistogramDataPoint(identifier, newPoint) - return newPoint, true + subtractExponentialHistogramDataPoint(point, start) + s.previousCache.SetExponentialHistogramDataPoint(identifier, point) + return true } // lessThanExponentialHistogramDataPoint returns a < b. @@ -119,21 +116,17 @@ func lessThanExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramData return a.Count() < b.Count() || a.Sum() < b.Sum() } -// subtractExponentialHistogramDataPoint returns a - b. -func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramDataPoint) pmetric.ExponentialHistogramDataPoint { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewExponentialHistogramDataPoint() - a.CopyTo(newPoint) +// subtractExponentialHistogramDataPoint subtracts b from a. +func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramDataPoint) { // Use the timestamp from the normalization point - newPoint.SetStartTimestamp(b.Timestamp()) + a.SetStartTimestamp(b.Timestamp()) // Adjust the value based on the start point's value - newPoint.SetCount(a.Count() - b.Count()) + a.SetCount(a.Count() - b.Count()) // We drop points without a sum, so no need to check here. - newPoint.SetSum(a.Sum() - b.Sum()) - newPoint.SetZeroCount(a.ZeroCount() - b.ZeroCount()) - newPoint.Positive().BucketCounts().FromRaw(subtractExponentialBuckets(a.Positive(), b.Positive())) - newPoint.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative())) - return newPoint + a.SetSum(a.Sum() - b.Sum()) + a.SetZeroCount(a.ZeroCount() - b.ZeroCount()) + a.Positive().BucketCounts().FromRaw(subtractExponentialBuckets(a.Positive(), b.Positive())) + a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative())) } // subtractExponentialBuckets returns a - b. @@ -152,7 +145,7 @@ func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBucket return newBuckets } -func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) (pmetric.HistogramDataPoint, bool) { +func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) bool { start, hasStart := s.startCache.GetHistogramDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -162,10 +155,10 @@ func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.Histogram // Record it in history and drop the point. s.startCache.SetHistogramDataPoint(identifier, point) s.previousCache.SetHistogramDataPoint(identifier, point) - return pmetric.HistogramDataPoint{}, false + return false } // No normalization required, since we haven't cached anything, and the start TS is non-zero. - return point, true + return true } // The number of buckets changed, so we can't normalize points anymore. @@ -173,7 +166,7 @@ func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.Histogram if !bucketBoundariesEqual(point.ExplicitBounds(), start.ExplicitBounds()) { s.startCache.SetHistogramDataPoint(identifier, point) s.previousCache.SetHistogramDataPoint(identifier, point) - return pmetric.HistogramDataPoint{}, false + return false } previous, hasPrevious := s.previousCache.GetHistogramDataPoint(identifier) @@ -184,23 +177,20 @@ func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.Histogram } if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) || (point.StartTimestamp() == 0 && lessThanHistogramDataPoint(point, previous)) { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewHistogramDataPoint() // This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point. // Assume the reset occurred at T - 1 ms, and leave the value untouched. - point.CopyTo(newPoint) - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - s.previousCache.SetHistogramDataPoint(identifier, newPoint) + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) + s.previousCache.SetHistogramDataPoint(identifier, point) // For subsequent points, we don't want to modify the value, but we do // want to make the start timestamp match the point we write here. // Store a point with the same timestamps, but zero value to achieve // that behavior. zeroPoint := pmetric.NewHistogramDataPoint() - zeroPoint.SetTimestamp(newPoint.StartTimestamp()) - newPoint.ExplicitBounds().CopyTo(zeroPoint.ExplicitBounds()) - zeroPoint.BucketCounts().FromRaw(make([]uint64, newPoint.BucketCounts().Len())) + zeroPoint.SetTimestamp(point.StartTimestamp()) + point.ExplicitBounds().CopyTo(zeroPoint.ExplicitBounds()) + zeroPoint.BucketCounts().FromRaw(make([]uint64, point.BucketCounts().Len())) s.startCache.SetHistogramDataPoint(identifier, zeroPoint) - return newPoint, true + return true } if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) { // We found a cached start timestamp that wouldn't produce a valid point. @@ -210,12 +200,12 @@ func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.Histogram zap.String("lastRecordedReset", start.Timestamp().String()), zap.String("dataPoint", point.Timestamp().String()), ) - return pmetric.HistogramDataPoint{}, false + return false } // There was no reset, so normalize the point against the start point - newPoint := subtractHistogramDataPoint(point, start) - s.previousCache.SetHistogramDataPoint(identifier, newPoint) - return newPoint, true + subtractHistogramDataPoint(point, start) + s.previousCache.SetHistogramDataPoint(identifier, point) + return true } // lessThanHistogramDataPoint returns a < b. @@ -224,24 +214,20 @@ func lessThanHistogramDataPoint(a, b pmetric.HistogramDataPoint) bool { } // subtractHistogramDataPoint returns a - b. -func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) pmetric.HistogramDataPoint { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewHistogramDataPoint() - a.CopyTo(newPoint) +func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) { // Use the timestamp from the normalization point - newPoint.SetStartTimestamp(b.Timestamp()) + a.SetStartTimestamp(b.Timestamp()) // Adjust the value based on the start point's value - newPoint.SetCount(a.Count() - b.Count()) + a.SetCount(a.Count() - b.Count()) // We drop points without a sum, so no need to check here. - newPoint.SetSum(a.Sum() - b.Sum()) + a.SetSum(a.Sum() - b.Sum()) aBuckets := a.BucketCounts() bBuckets := b.BucketCounts() newBuckets := make([]uint64, aBuckets.Len()) for i := 0; i < aBuckets.Len(); i++ { newBuckets[i] = aBuckets.At(i) - bBuckets.At(i) } - newPoint.BucketCounts().FromRaw(newBuckets) - return newPoint + a.BucketCounts().FromRaw(newBuckets) } func bucketBoundariesEqual(a, b pcommon.Float64Slice) bool { @@ -258,7 +244,7 @@ func bucketBoundariesEqual(a, b pcommon.Float64Slice) bool { // NormalizeNumberDataPoint normalizes a cumulative, monotonic sum. // It returns the normalized point, and true if the point should be kept. -func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) (pmetric.NumberDataPoint, bool) { +func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) bool { start, hasStart := s.startCache.GetNumberDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -268,10 +254,10 @@ func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPo // Record it in history and drop the point. s.startCache.SetNumberDataPoint(identifier, point) s.previousCache.SetNumberDataPoint(identifier, point) - return pmetric.NumberDataPoint{}, false + return false } // No normalization required, since we haven't cached anything, and the start TS is non-zer0. - return point, true + return true } previous, hasPrevious := s.previousCache.GetNumberDataPoint(identifier) @@ -281,21 +267,18 @@ func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPo previous = start } if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) || (point.StartTimestamp() == 0 && lessThanNumberDataPoint(point, previous)) { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewNumberDataPoint() // This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point. // Assume the reset occurred at T - 1 ms, and leave the value untouched. - point.CopyTo(newPoint) - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - s.previousCache.SetNumberDataPoint(identifier, newPoint) + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) + s.previousCache.SetNumberDataPoint(identifier, point) // For subsequent points, we don't want to modify the value, but we do // want to make the start timestamp match the point we write here. // Store a point with the same timestamps, but zero value to achieve // that behavior. zeroPoint := pmetric.NewNumberDataPoint() - zeroPoint.SetTimestamp(newPoint.StartTimestamp()) + zeroPoint.SetTimestamp(point.StartTimestamp()) s.startCache.SetNumberDataPoint(identifier, zeroPoint) - return newPoint, true + return true } if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) { // We found a cached start timestamp that wouldn't produce a valid point. @@ -305,12 +288,12 @@ func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPo zap.String("lastRecordedReset", start.Timestamp().String()), zap.String("dataPoint", point.Timestamp().String()), ) - return pmetric.NumberDataPoint{}, false + return false } // There was no reset, so normalize the point against the start point - newPoint := subtractNumberDataPoint(point, start) - s.previousCache.SetNumberDataPoint(identifier, newPoint) - return newPoint, true + subtractNumberDataPoint(point, start) + s.previousCache.SetNumberDataPoint(identifier, point) + return true } // lessThanNumberDataPoint returns a < b. @@ -324,24 +307,20 @@ func lessThanNumberDataPoint(a, b pmetric.NumberDataPoint) bool { return false } -// subtractNumberDataPoint returns a - b. -func subtractNumberDataPoint(a, b pmetric.NumberDataPoint) pmetric.NumberDataPoint { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewNumberDataPoint() - a.CopyTo(newPoint) +// subtractNumberDataPoint subtracts b from a. +func subtractNumberDataPoint(a, b pmetric.NumberDataPoint) { // Use the timestamp from the normalization point - newPoint.SetStartTimestamp(b.Timestamp()) + a.SetStartTimestamp(b.Timestamp()) // Adjust the value based on the start point's value - switch newPoint.ValueType() { + switch a.ValueType() { case pmetric.NumberDataPointValueTypeInt: - newPoint.SetIntValue(a.IntValue() - b.IntValue()) + a.SetIntValue(a.IntValue() - b.IntValue()) case pmetric.NumberDataPointValueTypeDouble: - newPoint.SetDoubleValue(a.DoubleValue() - b.DoubleValue()) + a.SetDoubleValue(a.DoubleValue() - b.DoubleValue()) } - return newPoint } -func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) (pmetric.SummaryDataPoint, bool) { +func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) bool { start, hasStart := s.startCache.GetSummaryDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -351,10 +330,10 @@ func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryData // Record it in history and drop the point. s.startCache.SetSummaryDataPoint(identifier, point) s.previousCache.SetSummaryDataPoint(identifier, point) - return pmetric.SummaryDataPoint{}, false + return false } // No normalization required, since we haven't cached anything, and the start TS is non-zer0. - return point, true + return true } previous, hasPrevious := s.previousCache.GetSummaryDataPoint(identifier) @@ -364,21 +343,18 @@ func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryData previous = start } if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) || (point.StartTimestamp() == 0 && lessThanSummaryDataPoint(point, previous)) { - // Make a copy so we don't mutate underlying data - newPoint := pmetric.NewSummaryDataPoint() // This is a reset point, but we have seen this timeseries before, so we know the reset happened in the time period since the last point. // Assume the reset occurred at T - 1 ms, and leave the value untouched. - point.CopyTo(newPoint) - newPoint.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) - s.previousCache.SetSummaryDataPoint(identifier, newPoint) + point.SetStartTimestamp(pcommon.Timestamp(uint64(point.Timestamp()) - uint64(time.Millisecond))) + s.previousCache.SetSummaryDataPoint(identifier, point) // For subsequent points, we don't want to modify the value, but we do // want to make the start timestamp match the point we write here. // Store a point with the same timestamps, but zero value to achieve // that behavior. zeroPoint := pmetric.NewSummaryDataPoint() - zeroPoint.SetTimestamp(newPoint.StartTimestamp()) + zeroPoint.SetTimestamp(point.StartTimestamp()) s.startCache.SetSummaryDataPoint(identifier, zeroPoint) - return newPoint, true + return true } if !start.Timestamp().AsTime().Before(point.Timestamp().AsTime()) { // We found a cached start timestamp that wouldn't produce a valid point. @@ -388,12 +364,12 @@ func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryData zap.String("lastRecordedReset", start.Timestamp().String()), zap.String("dataPoint", point.Timestamp().String()), ) - return pmetric.SummaryDataPoint{}, false + return false } // There was no reset, so normalize the point against the start point - newPoint := subtractSummaryDataPoint(point, start) - s.previousCache.SetSummaryDataPoint(identifier, newPoint) - return newPoint, true + subtractSummaryDataPoint(point, start) + s.previousCache.SetSummaryDataPoint(identifier, point) + return true } // lessThanSummaryDataPoint returns a < b. @@ -401,19 +377,14 @@ func lessThanSummaryDataPoint(a, b pmetric.SummaryDataPoint) bool { return a.Count() < b.Count() || a.Sum() < b.Sum() } -// subtractSummaryDataPoint returns a - b. -func subtractSummaryDataPoint(a, b pmetric.SummaryDataPoint) pmetric.SummaryDataPoint { - // Make a copy so we don't mutate underlying data. - newPoint := pmetric.NewSummaryDataPoint() - // Quantile values are copied, and are not modified. Quantiles are - // computed over the same time period as sum and count, but it isn't - // possible to normalize them. - a.CopyTo(newPoint) +// subtractSummaryDataPoint subtracts b from a. +func subtractSummaryDataPoint(a, b pmetric.SummaryDataPoint) { + // Quantile values are not modified. Quantiles are computed over the same + // time period as sum and count, but it isn't possible to normalize them. // Use the timestamp from the normalization point - newPoint.SetStartTimestamp(b.Timestamp()) + a.SetStartTimestamp(b.Timestamp()) // Adjust the value based on the start point's value - newPoint.SetCount(a.Count() - b.Count()) + a.SetCount(a.Count() - b.Count()) // We drop points without a sum, so no need to check here. - newPoint.SetSum(a.Sum() - b.Sum()) - return newPoint + a.SetSum(a.Sum() - b.Sum()) } diff --git a/exporter/collector/internal/normalization/types.go b/exporter/collector/internal/normalization/types.go index 5b1a2268..7b2c0cdd 100644 --- a/exporter/collector/internal/normalization/types.go +++ b/exporter/collector/internal/normalization/types.go @@ -22,14 +22,14 @@ import ( type Normalizer interface { // NormalizeExponentialHistogramDataPoint normalizes an exponential histogram. // It returns the normalized point, and true if the point should be kept. - NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) + NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) bool // NormalizeHistogramDataPoint normalizes a cumulative histogram. // It returns the normalized point, and true if the point should be kept. - NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) (pmetric.HistogramDataPoint, bool) + NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) bool // NormalizeNumberDataPoint normalizes a cumulative, monotonic sum. // It returns the normalized point, and true if the point should be kept. - NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) (pmetric.NumberDataPoint, bool) + NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) bool // NormalizeSummaryDataPoint normalizes a summary. // It returns the normalized point, and true if the point should be kept. - NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) (pmetric.SummaryDataPoint, bool) + NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) bool } diff --git a/exporter/collector/logs.go b/exporter/collector/logs.go index 79763b4d..9feb2d34 100644 --- a/exporter/collector/logs.go +++ b/exporter/collector/logs.go @@ -395,17 +395,13 @@ func (l logMapper) getLogName(log plog.LogRecord) (string, error) { } func (l logMapper) logToSplitEntries( - log plog.LogRecord, + logRecord plog.LogRecord, mr *monitoredrespb.MonitoredResource, logLabels map[string]string, processTime time.Time, logName string, projectID string, ) ([]*logpb.LogEntry, error) { - // make a copy in case we mutate the record - logRecord := plog.NewLogRecord() - log.CopyTo(logRecord) - ts := logRecord.Timestamp().AsTime() if logRecord.Timestamp() == 0 || ts.IsZero() { // if timestamp is unset, fall back to observed_time_unix_nano as recommended diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index efbe718d..0c2895c1 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -337,12 +337,9 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) e // separate request later. pendingTimeSeries := map[string][]*monitoringpb.TimeSeries{} - // add extra metrics from the ExtraMetrics() extension point, combine into a new copy + // add extra metrics from the ExtraMetrics() extension point if me.cfg.MetricConfig.ExtraMetrics != nil { - metricsCopy := pmetric.NewMetrics() - m.ResourceMetrics().CopyTo(metricsCopy.ResourceMetrics()) - me.cfg.MetricConfig.ExtraMetrics(metricsCopy) - m = metricsCopy + me.cfg.MetricConfig.ExtraMetrics(m) } rms := m.ResourceMetrics() @@ -876,11 +873,10 @@ func (m *metricMapper) summaryPointToTimeSeries( } // Normalize the summary point. metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - normalizedPoint, keep := m.normalizer.NormalizeSummaryDataPoint(point, metricIdentifier) + keep := m.normalizer.NormalizeSummaryDataPoint(point, metricIdentifier) if !keep { return nil } - point = normalizedPoint sumType, countType, quantileType, err := m.summaryMetricTypes(metric) if err != nil { m.obs.log.Debug("Failed to get metric type (i.e. name) for summary metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) @@ -1149,11 +1145,10 @@ func (m *metricMapper) histogramToTimeSeries( if hist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize cumulative histogram points. metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - normalizedPoint, keep := m.normalizer.NormalizeHistogramDataPoint(point, metricIdentifier) + keep := m.normalizer.NormalizeHistogramDataPoint(point, metricIdentifier) if !keep { return nil } - point = normalizedPoint } // We treat deltas as cumulatives w/ resets. @@ -1203,11 +1198,10 @@ func (m *metricMapper) exponentialHistogramToTimeSeries( if exponentialHist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize the histogram point. metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - normalizedPoint, keep := m.normalizer.NormalizeExponentialHistogramDataPoint(point, metricIdentifier) + keep := m.normalizer.NormalizeExponentialHistogramDataPoint(point, metricIdentifier) if !keep { return nil } - point = normalizedPoint } // We treat deltas as cumulatives w/ resets. metricKind := metricpb.MetricDescriptor_CUMULATIVE @@ -1258,11 +1252,10 @@ func (m *metricMapper) sumPointToTimeSeries( if sum.IsMonotonic() { if sum.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - normalizedPoint, keep := m.normalizer.NormalizeNumberDataPoint(point, metricIdentifier) + keep := m.normalizer.NormalizeNumberDataPoint(point, metricIdentifier) if !keep { return nil } - point = normalizedPoint } startTime = timestamppb.New(point.StartTimestamp().AsTime()) } else {