From 88c52235f3feabcb67cf33e7e08f861d5095e2fd Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Wed, 1 Mar 2023 13:05:20 +0200 Subject: [PATCH 1/4] added gauge for cache size --- pkg/pipeline/encode/encode_prom.go | 12 ++++++++- .../extract/aggregate/aggregate_test.go | 2 +- pkg/pipeline/extract/aggregate/aggregates.go | 2 +- pkg/pipeline/utils/timed_cache.go | 25 +++++++++++++------ 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index c13e008b1..2a9e8c163 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -56,6 +56,7 @@ type EncodeProm struct { aggHistos []histoInfo expiryTime time.Duration mCache *putils.TimedCache + mChacheLen prometheus.Gauge exitChan <-chan struct{} metricsProcessed prometheus.Counter metricsDropped prometheus.Counter @@ -334,13 +335,22 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Debugf("histos = %v", histos) log.Debugf("aggHistos = %v", aggHistos) + mChacheLen := operational.DefineMetric( + "encode_prom_metrics_reported", + "Total number of prometheus metrics reported by this stage", + operational.TypeGauge, + "stage", + ) + mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name) + w := &EncodeProm{ counters: counters, gauges: gauges, histos: histos, aggHistos: aggHistos, expiryTime: expiryTime, - mCache: putils.NewTimedCache(cfg.MaxMetrics), + mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric), + mChacheLen: mChacheLenMetric, exitChan: putils.ExitChannel(), metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name), metricsDropped: opMetrics.NewCounter(&metricsDropped, params.Name), diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index 2b80f74f7..c1471d27e 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -37,7 +37,7 @@ func GetMockAggregate() Aggregate { OperationType: "avg", OperationKey: "value", }, - cache: utils.NewTimedCache(0), + cache: utils.NewTimedCache(0, nil), mutex: &sync.Mutex{}, expiryTime: 30 * time.Second, } diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index bd13d0bb3..d0c25f209 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -61,7 +61,7 @@ func (aggregates *Aggregates) GetMetrics() []config.GenericMap { func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate { aggregate := Aggregate{ Definition: aggregateDefinition, - cache: utils.NewTimedCache(0), + cache: utils.NewTimedCache(0, nil), mutex: &sync.Mutex{}, expiryTime: aggregates.expiryTime, } diff --git a/pkg/pipeline/utils/timed_cache.go b/pkg/pipeline/utils/timed_cache.go index 1ccd6b5e2..d8e3e9819 100644 --- a/pkg/pipeline/utils/timed_cache.go +++ b/pkg/pipeline/utils/timed_cache.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -45,10 +46,11 @@ type TimedCacheMap map[string]*cacheEntry // If maxEntries is non-zero, this limits the number of entries in the cache to the number specified. // If maxEntries is zero, the cache has no size limit. type TimedCache struct { - mu sync.RWMutex - cacheList *list.List - cacheMap TimedCacheMap - maxEntries int + mu sync.RWMutex + cacheList *list.List + cacheMap TimedCacheMap + maxEntries int + cacheLenMetric prometheus.Gauge } func (tc *TimedCache) GetCacheEntry(key string) (interface{}, bool) { @@ -90,6 +92,9 @@ func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) (*cacheEnt // place at end of list cEntry.e = tc.cacheList.PushBack(cEntry) tc.cacheMap[key] = cEntry + if tc.cacheLenMetric != nil { + tc.cacheLenMetric.Inc() + } } return cEntry, true } @@ -140,14 +145,18 @@ func (tc *TimedCache) CleanupExpiredEntries(expiry time.Duration, callback Cache callback(pCacheInfo.SourceEntry) delete(tc.cacheMap, pCacheInfo.key) tc.cacheList.Remove(listEntry) + if tc.cacheLenMetric != nil { + tc.cacheLenMetric.Dec() + } } } -func NewTimedCache(maxEntries int) *TimedCache { +func NewTimedCache(maxEntries int, cacheLenMetric prometheus.Gauge) *TimedCache { l := &TimedCache{ - cacheList: list.New(), - cacheMap: make(TimedCacheMap), - maxEntries: maxEntries, + cacheList: list.New(), + cacheMap: make(TimedCacheMap), + maxEntries: maxEntries, + cacheLenMetric: cacheLenMetric, } return l } From 0db12dbc2fa66b5b201c385b3733c4d23629f6f2 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Thu, 2 Mar 2023 10:12:26 +0200 Subject: [PATCH 2/4] moved new metric together with other metrics --- docs/operational-metrics.md | 8 ++++++++ pkg/pipeline/encode/encode_prom.go | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 2320f80c3..5a4cb1a57 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -47,6 +47,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | error, metric, key | +### encode_prom_metrics_reported +| **Name** | encode_prom_metrics_reported | +|:---|:---| +| **Description** | Total number of prometheus metrics reported by this stage | +| **Type** | gauge | +| **Labels** | stage | + + ### ingest_batch_size_bytes | **Name** | ingest_batch_size_bytes | |:---|:---| diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 2a9e8c163..39c55c1be 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -82,6 +82,12 @@ var ( operational.TypeCounter, "error", "metric", "key", ) + mChacheLen = operational.DefineMetric( + "encode_prom_metrics_reported", + "Total number of prometheus metrics reported by this stage", + operational.TypeGauge, + "stage", + ) ) // Encode encodes a metric before being stored From d5acdf38da63d05fb7246063d4bcb7dceca8aec1 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Sun, 12 Mar 2023 10:52:35 +0200 Subject: [PATCH 3/4] removed unused metric --- docs/operational-metrics.md | 8 -------- pkg/pipeline/encode/encode_prom.go | 6 ------ 2 files changed, 14 deletions(-) diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 5a4cb1a57..2320f80c3 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -47,14 +47,6 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | error, metric, key | -### encode_prom_metrics_reported -| **Name** | encode_prom_metrics_reported | -|:---|:---| -| **Description** | Total number of prometheus metrics reported by this stage | -| **Type** | gauge | -| **Labels** | stage | - - ### ingest_batch_size_bytes | **Name** | ingest_batch_size_bytes | |:---|:---| diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 39c55c1be..2a9e8c163 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -82,12 +82,6 @@ var ( operational.TypeCounter, "error", "metric", "key", ) - mChacheLen = operational.DefineMetric( - "encode_prom_metrics_reported", - "Total number of prometheus metrics reported by this stage", - operational.TypeGauge, - "stage", - ) ) // Encode encodes a metric before being stored From b56c584e190a51f1e77a83c06aaa9be45cd564f2 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Mon, 13 Mar 2023 09:10:13 +0200 Subject: [PATCH 4/4] moved new operational metric together with others --- docs/operational-metrics.md | 8 ++++++++ pkg/pipeline/encode/encode_prom.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 2320f80c3..5a4cb1a57 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -47,6 +47,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | error, metric, key | +### encode_prom_metrics_reported +| **Name** | encode_prom_metrics_reported | +|:---|:---| +| **Description** | Total number of prometheus metrics reported by this stage | +| **Type** | gauge | +| **Labels** | stage | + + ### ingest_batch_size_bytes | **Name** | ingest_batch_size_bytes | |:---|:---| diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 2a9e8c163..5f7cb1b20 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -56,7 +56,7 @@ type EncodeProm struct { aggHistos []histoInfo expiryTime time.Duration mCache *putils.TimedCache - mChacheLen prometheus.Gauge + mChacheLenMetric prometheus.Gauge exitChan <-chan struct{} metricsProcessed prometheus.Counter metricsDropped prometheus.Counter @@ -82,6 +82,12 @@ var ( operational.TypeCounter, "error", "metric", "key", ) + mChacheLen = operational.DefineMetric( + "encode_prom_metrics_reported", + "Total number of prometheus metrics reported by this stage", + operational.TypeGauge, + "stage", + ) ) // Encode encodes a metric before being stored @@ -335,12 +341,6 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Debugf("histos = %v", histos) log.Debugf("aggHistos = %v", aggHistos) - mChacheLen := operational.DefineMetric( - "encode_prom_metrics_reported", - "Total number of prometheus metrics reported by this stage", - operational.TypeGauge, - "stage", - ) mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name) w := &EncodeProm{ @@ -350,7 +350,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En aggHistos: aggHistos, expiryTime: expiryTime, mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric), - mChacheLen: mChacheLenMetric, + mChacheLenMetric: mChacheLenMetric, exitChan: putils.ExitChannel(), metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name), metricsDropped: opMetrics.NewCounter(&metricsDropped, params.Name),