Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: Reduce CPU cycles spent in Prometheus metric collection #3414

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 85 additions & 9 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package metrics

import (
"bytes"
"fmt"
"regexp"
"sort"
"strconv"
"sync"
"time"

"github.com/google/cadvisor/container"
Expand Down Expand Up @@ -100,6 +103,22 @@ type PrometheusCollector struct {
containerLabelsFunc ContainerLabelsFunc
includedMetrics container.MetricSet
opts v2.RequestOptions

labelKeyMtx *sync.RWMutex
labelKey []byte
descs *descriptions
}

type descriptions struct {
startTimeSeconds *prometheus.Desc
cpuPeriod *prometheus.Desc
cpuQuota *prometheus.Desc
cpuShares *prometheus.Desc
memoryLimit *prometheus.Desc
memorySwapLimit *prometheus.Desc
memoryReservationLimit *prometheus.Desc

containerMetricDescs []*prometheus.Desc
}

// NewPrometheusCollector returns a new PrometheusCollector. The passed
Expand Down Expand Up @@ -133,6 +152,7 @@ func NewPrometheusCollector(i infoProvider, f ContainerLabelsFunc, includedMetri
},
includedMetrics: includedMetrics,
opts: opts,
labelKeyMtx: &sync.RWMutex{},
}
if includedMetrics.Has(container.CpuUsageMetrics) {
c.containerMetrics = append(c.containerMetrics, []containerMetric{
Expand Down Expand Up @@ -1832,6 +1852,62 @@ func (c *PrometheusCollector) collectContainersInfo(ch chan<- prometheus.Metric)
}
}

keyLength := 0
labelNames := make([]string, 0, len(rawLabels))
for l := range rawLabels {
keyLength += len(l)
labelNames = append(labelNames, l)
}
sort.Strings(labelNames)

key := make([]byte, 0, keyLength)
for _, l := range labelNames {
key = append(key, l...)
}

c.labelKeyMtx.RLock()
prevKey := c.labelKey
descs := c.descs
c.labelKeyMtx.RUnlock()

if !bytes.Equal(prevKey, key) {
labels := make([]string, 0, len(rawLabels))
for l := range rawLabels {
duplicate := false
sl := sanitizeLabelName(l)
for _, x := range labels {
if sl == x {
duplicate = true
break
}
}
if !duplicate {
labels = append(labels, sl)
}
}
// recompute descs
descs = &descriptions{
startTimeSeconds: prometheus.NewDesc("container_start_time_seconds", "Start time of the container since unix epoch in seconds.", labels, nil),
cpuPeriod: prometheus.NewDesc("container_spec_cpu_period", "CPU period of the container.", labels, nil),
cpuQuota: prometheus.NewDesc("container_spec_cpu_quota", "CPU quota of the container.", labels, nil),
cpuShares: prometheus.NewDesc("container_spec_cpu_shares", "CPU share of the container.", labels, nil),
memoryLimit: prometheus.NewDesc("container_spec_memory_limit_bytes", "Memory limit for the container.", labels, nil),
memorySwapLimit: prometheus.NewDesc("container_spec_memory_swap_limit_bytes", "Memory swap limit for the container.", labels, nil),
memoryReservationLimit: prometheus.NewDesc("container_spec_memory_reservation_limit_bytes", "Memory reservation limit for the container.", labels, nil),
}

descs.containerMetricDescs = make([]*prometheus.Desc, 0, len(c.containerMetrics))
for _, cm := range c.containerMetrics {
desc := cm.desc(labels)
descs.containerMetricDescs = append(descs.containerMetricDescs, desc)
}

c.labelKeyMtx.Lock()
c.labelKey = key
c.descs = descs
c.labelKeyMtx.Unlock()
}

for _, cont := range containers {
values := make([]string, 0, len(rawLabels))
labels := make([]string, 0, len(rawLabels))
Expand All @@ -1852,26 +1928,26 @@ func (c *PrometheusCollector) collectContainersInfo(ch chan<- prometheus.Metric)
}

// Container spec
desc := prometheus.NewDesc("container_start_time_seconds", "Start time of the container since unix epoch in seconds.", labels, nil)
desc := descs.startTimeSeconds
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cont.Spec.CreationTime.Unix()), values...)

if cont.Spec.HasCpu {
desc = prometheus.NewDesc("container_spec_cpu_period", "CPU period of the container.", labels, nil)
desc = descs.cpuPeriod
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cont.Spec.Cpu.Period), values...)
if cont.Spec.Cpu.Quota != 0 {
desc = prometheus.NewDesc("container_spec_cpu_quota", "CPU quota of the container.", labels, nil)
desc = descs.cpuQuota
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cont.Spec.Cpu.Quota), values...)
}
desc := prometheus.NewDesc("container_spec_cpu_shares", "CPU share of the container.", labels, nil)
desc := descs.cpuShares
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(cont.Spec.Cpu.Limit), values...)

}
if cont.Spec.HasMemory {
desc := prometheus.NewDesc("container_spec_memory_limit_bytes", "Memory limit for the container.", labels, nil)
desc := descs.memoryLimit
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, specMemoryValue(cont.Spec.Memory.Limit), values...)
desc = prometheus.NewDesc("container_spec_memory_swap_limit_bytes", "Memory swap limit for the container.", labels, nil)
desc = descs.memorySwapLimit
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, specMemoryValue(cont.Spec.Memory.SwapLimit), values...)
desc = prometheus.NewDesc("container_spec_memory_reservation_limit_bytes", "Memory reservation limit for the container.", labels, nil)
desc = descs.memoryReservationLimit
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, specMemoryValue(cont.Spec.Memory.Reservation), values...)
}

Expand All @@ -1880,11 +1956,11 @@ func (c *PrometheusCollector) collectContainersInfo(ch chan<- prometheus.Metric)
continue
}
stats := cont.Stats[0]
for _, cm := range c.containerMetrics {
for i, cm := range c.containerMetrics {
if cm.condition != nil && !cm.condition(cont.Spec) {
continue
}
desc := cm.desc(labels)
desc := descs.containerMetricDescs[i]
for _, metricValue := range cm.getValues(stats) {
ch <- prometheus.NewMetricWithTimestamp(
metricValue.timestamp,
Expand Down
87 changes: 83 additions & 4 deletions metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package metrics

import (
"context"
"errors"
"os"
"testing"
Expand Down Expand Up @@ -123,7 +124,9 @@ func TestNewPrometheusCollectorWithPerf(t *testing.T) {
}

func TestNewPrometheusCollectorWithRequestOptions(t *testing.T) {
p := mockInfoProvider{}
p := mockInfoProvider{
containerInfo: map[string]*info.ContainerInfo{},
}
opts := v2.RequestOptions{
IdType: "docker",
}
Expand All @@ -134,16 +137,17 @@ func TestNewPrometheusCollectorWithRequestOptions(t *testing.T) {
}

type mockInfoProvider struct {
options v2.RequestOptions
containerInfo map[string]*info.ContainerInfo
options v2.RequestOptions
}

func (m *mockInfoProvider) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) {
m.options = options
return map[string]*info.ContainerInfo{}, nil
return m.containerInfo, nil
}

func (m *mockInfoProvider) GetVersionInfo() (*info.VersionInfo, error) {
return nil, errors.New("not supported")
return &info.VersionInfo{}, nil
}

func (m *mockInfoProvider) GetMachineInfo() (*info.MachineInfo, error) {
Expand Down Expand Up @@ -335,3 +339,78 @@ func TestGetMinCoreScalingRatio(t *testing.T) {
assert.Contains(t, values, 0.5)
assert.Contains(t, values, 0.3)
}

func BenchmarkPrometheusCollector(b *testing.B) {
p := mockInfoProvider{
containerInfo: map[string]*info.ContainerInfo{
"8cb3438991e90a4b415fd7920bd8b67f59e90c62ae82b89c081ac320f31cf427": {
ContainerReference: info.ContainerReference{
Id: "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3d0df1aa_a48c_4cff_88be_9222491bdcb7.slice/cri-containerd-8cb3438991e90a4b415fd7920bd8b67f59e90c62ae82b89c081ac320f31cf427.scope",
Name: "alertmanager",
Namespace: "observability",
},
Spec: info.ContainerSpec{
Image: "quay.io/prometheus/alertmanager:v0.26.0",
},
Stats: []*info.ContainerStats{{
Timestamp: time.Now(),
Cpu: info.CpuStats{
Usage: info.CpuUsage{
Total: 1000000000,
User: 100000000,
System: 900000000,
PerCpu: []uint64{100000000, 900000000},
},
CFS: info.CpuCFS{
Periods: 100,
ThrottledPeriods: 10,
ThrottledTime: 100000000,
},
Schedstat: info.CpuSchedstat{
RunTime: 100000000,
RunqueueTime: 100000000,
RunPeriods: 100,
},
LoadAverage: 5,
},
}},
}},
}
opts := v2.RequestOptions{
IdType: "docker",
}

labels := map[string]string{
"container": "alertmanager",
"id": "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3d0df1aa_a48c_4cff_88be_9222491bdcb7.slice/cri-containerd-8cb3438991e90a4b415fd7920bd8b67f59e90c62ae82b89c081ac320f31cf427.scope",
"image": "quay.io/prometheus/alertmanager:v0.26.0",
"name": "8cb3438991e90a4b415fd7920bd8b67f59e90c62ae82b89c081ac320f31cf427",
"namespace": "observability",
"pod": "alertmanager-main-1",
}

labelFunc := func(*info.ContainerInfo) map[string]string {
return labels
}

c := NewPrometheusCollector(&p, labelFunc, container.AllMetrics, now, opts)
ch := make(chan prometheus.Metric)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
for {
select {
case <-ctx.Done():
return
case m := <-ch:
_ = m
}
}
}()

b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Collect(ch)
}
}
Loading