From 96fbab0590a75cbeba1b3e52bb993e28eccfeb0d Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Tue, 6 Jun 2023 12:55:09 -0700 Subject: [PATCH] [receiver/kafkametrics] Fix metric type (#4327) --- .chloggen/fix-kafkametric-receiver-type.yaml | 20 +++++++++++++++++++ .../broker_scraper_test.go | 2 +- .../kafkametricsreceiver/documentation.md | 6 +++--- .../internal/metadata/generated_metrics.go | 12 ++++++----- .../metadata/generated_metrics_test.go | 8 +++++--- receiver/kafkametricsreceiver/metadata.yaml | 12 ++++++----- .../testdata/integration/expected.yaml | 3 ++- .../windowsperfcounters_scraper_test.go | 11 ++++------ 8 files changed, 49 insertions(+), 25 deletions(-) create mode 100644 .chloggen/fix-kafkametric-receiver-type.yaml diff --git a/.chloggen/fix-kafkametric-receiver-type.yaml b/.chloggen/fix-kafkametric-receiver-type.yaml new file mode 100644 index 000000000000..e5445b617f60 --- /dev/null +++ b/.chloggen/fix-kafkametric-receiver-type.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/kafkametricsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Updates certain metrics in kafkametricsreceiver to function as non-monotonic sums. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [4327] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Update the metric type in KafkaMetricsReceiver from "gauge" to "nonmonotonic sum". \ No newline at end of file diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 5be948ad1340..37079cb16c97 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -106,7 +106,7 @@ func TestBrokerScraper_scrape(t *testing.T) { assert.NoError(t, err) expectedDp := int64(len(testBrokers)) receivedMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - receivedDp := receivedMetrics.Gauge().DataPoints().At(0).IntValue() + receivedDp := receivedMetrics.Sum().DataPoints().At(0).IntValue() assert.Equal(t, expectedDp, receivedDp) } diff --git a/receiver/kafkametricsreceiver/documentation.md b/receiver/kafkametricsreceiver/documentation.md index 58157c3eb81e..c831edcdfe53 100644 --- a/receiver/kafkametricsreceiver/documentation.md +++ b/receiver/kafkametricsreceiver/documentation.md @@ -16,9 +16,9 @@ metrics: Number of brokers in the cluster. -| Unit | Metric Type | Value Type | -| ---- | ----------- | ---------- | -| {brokers} | Gauge | Int | +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {brokers} | Sum | Int | Delta | false | ### kafka.consumer_group.lag diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go index cabb3f8548a0..40663a91216f 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go @@ -22,14 +22,16 @@ func (m *metricKafkaBrokers) init() { m.data.SetName("kafka.brokers") m.data.SetDescription("Number of brokers in the cluster.") m.data.SetUnit("{brokers}") - m.data.SetEmptyGauge() + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) } func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + dp := m.data.Sum().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) dp.SetIntValue(val) @@ -37,14 +39,14 @@ func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon // updateCapacity saves max length of data point slices that will be used for the slice capacity. func (m *metricKafkaBrokers) updateCapacity() { - if m.data.Gauge().DataPoints().Len() > m.capacity { - m.capacity = m.data.Gauge().DataPoints().Len() + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() } } // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricKafkaBrokers) emit(metrics pmetric.MetricSlice) { - if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go index bdbc21b3428d..84f7e3ecff3a 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go @@ -126,11 +126,13 @@ func TestMetricsBuilder(t *testing.T) { case "kafka.brokers": assert.False(t, validatedMetrics["kafka.brokers"], "Found a duplicate in the metrics slice: kafka.brokers") validatedMetrics["kafka.brokers"] = true - assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) - assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) assert.Equal(t, "Number of brokers in the cluster.", ms.At(i).Description()) assert.Equal(t, "{brokers}", ms.At(i).Unit()) - dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, false, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) diff --git a/receiver/kafkametricsreceiver/metadata.yaml b/receiver/kafkametricsreceiver/metadata.yaml index 4f531c8ed8ce..5e998d12f750 100644 --- a/receiver/kafkametricsreceiver/metadata.yaml +++ b/receiver/kafkametricsreceiver/metadata.yaml @@ -18,14 +18,16 @@ attributes: type: string metrics: -# brokers scraper + # brokers scraper kafka.brokers: enabled: true description: Number of brokers in the cluster. unit: "{brokers}" - gauge: + sum: + monotonic: false value_type: int -# topics scraper + aggregation: delta + # topics scraper kafka.topic.partitions: enabled: true description: Number of partitions in topic. @@ -61,7 +63,7 @@ metrics: gauge: value_type: int attributes: [topic, partition] -# consumers scraper + # consumers scraper kafka.consumer_group.members: enabled: true description: Count of members in the consumer group @@ -96,4 +98,4 @@ metrics: unit: 1 gauge: value_type: int - attributes: [group, topic] + attributes: [group, topic] \ No newline at end of file diff --git a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml index baf81546fd9e..af55af7f89a9 100644 --- a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml +++ b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml @@ -3,7 +3,8 @@ resourceMetrics: scopeMetrics: - metrics: - description: Number of brokers in the cluster. - gauge: + sum: + aggregationTemporality: 1 dataPoints: - asInt: "1" startTimeUnixNano: "1685063120199110000" diff --git a/receiver/windowsperfcountersreceiver/windowsperfcounters_scraper_test.go b/receiver/windowsperfcountersreceiver/windowsperfcounters_scraper_test.go index 58406c242270..5f675a85cc4f 100644 --- a/receiver/windowsperfcountersreceiver/windowsperfcounters_scraper_test.go +++ b/receiver/windowsperfcountersreceiver/windowsperfcounters_scraper_test.go @@ -93,7 +93,7 @@ func Test_WindowsPerfCounterScraper(t *testing.T) { {Object: "Processor", Instances: []string{"*"}, Counters: []CounterConfig{{Name: "% Idle Time", MetricRep: MetricRep{Name: "cpu.idle"}}}}, {Object: "Processor", Instances: []string{"1", "2"}, Counters: []CounterConfig{{Name: "% Processor Time", MetricRep: MetricRep{Name: "processor.time"}}}}, }, - ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute}, + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute, InitialDelay: time.Second}, }, expectedMetricPath: filepath.Join("testdata", "scraper", "standard.yaml"), }, @@ -110,7 +110,7 @@ func Test_WindowsPerfCounterScraper(t *testing.T) { PerfCounters: []ObjectConfig{ {Object: "Memory", Counters: []CounterConfig{{Name: "Committed Bytes", MetricRep: MetricRep{Name: "bytes.committed"}}}}, }, - ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute}, + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute, InitialDelay: time.Second}, }, expectedMetricPath: filepath.Join("testdata", "scraper", "sum_metric.yaml"), }, @@ -120,7 +120,7 @@ func Test_WindowsPerfCounterScraper(t *testing.T) { PerfCounters: []ObjectConfig{ {Object: "Memory", Counters: []CounterConfig{{Name: "Committed Bytes"}}}, }, - ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute}, + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute, InitialDelay: time.Second}, }, expectedMetricPath: filepath.Join("testdata", "scraper", "no_metric_def.yaml"), }, @@ -137,10 +137,7 @@ func Test_WindowsPerfCounterScraper(t *testing.T) { Counters: []CounterConfig{{Name: "Invalid Counter", MetricRep: MetricRep{Name: "invalid"}}}, }, }, - ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ - CollectionInterval: time.Minute - InitialDelay: time.Second, - }, + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{CollectionInterval: time.Minute, InitialDelay: time.Second}, }, startMessage: "some performance counters could not be initialized", startErr: "failed to create perf counter with path \\Invalid Object\\Invalid Counter: The specified object was not found on the computer.\r\n",