Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose cache size as an operational metric #398

Merged
merged 4 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | error, metric, key |


### encode_prom_metrics_reported
| **Name** | encode_prom_metrics_reported |
|:---|:---|
| **Description** | Total number of prometheus metrics reported by this stage |
| **Type** | gauge |
| **Labels** | stage |


### ingest_batch_size_bytes
| **Name** | ingest_batch_size_bytes |
|:---|:---|
Expand Down
12 changes: 11 additions & 1 deletion pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type EncodeProm struct {
aggHistos []histoInfo
expiryTime time.Duration
mCache *putils.TimedCache
mChacheLenMetric prometheus.Gauge
exitChan <-chan struct{}
metricsProcessed prometheus.Counter
metricsDropped prometheus.Counter
Expand All @@ -81,6 +82,12 @@ var (
operational.TypeCounter,
"error", "metric", "key",
)
mChacheLen = operational.DefineMetric(
"encode_prom_metrics_reported",
"Total number of prometheus metrics reported by this stage",
operational.TypeGauge,
"stage",
)
)

// Encode encodes a metric before being stored
Expand Down Expand Up @@ -334,13 +341,16 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
log.Debugf("histos = %v", histos)
log.Debugf("aggHistos = %v", aggHistos)

mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name)

w := &EncodeProm{
counters: counters,
gauges: gauges,
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime,
mCache: putils.NewTimedCache(cfg.MaxMetrics),
mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric),
mChacheLenMetric: mChacheLenMetric,
exitChan: putils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name),
metricsDropped: opMetrics.NewCounter(&metricsDropped, params.Name),
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func GetMockAggregate() Aggregate {
OperationType: "avg",
OperationKey: "value",
},
cache: utils.NewTimedCache(0),
cache: utils.NewTimedCache(0, nil),
mutex: &sync.Mutex{},
expiryTime: 30 * time.Second,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (aggregates *Aggregates) GetMetrics() []config.GenericMap {
func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate {
aggregate := Aggregate{
Definition: aggregateDefinition,
cache: utils.NewTimedCache(0),
cache: utils.NewTimedCache(0, nil),
mutex: &sync.Mutex{},
expiryTime: aggregates.expiryTime,
}
Expand Down
25 changes: 17 additions & 8 deletions pkg/pipeline/utils/timed_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

Expand All @@ -45,10 +46,11 @@ type TimedCacheMap map[string]*cacheEntry
// If maxEntries is non-zero, this limits the number of entries in the cache to the number specified.
// If maxEntries is zero, the cache has no size limit.
type TimedCache struct {
mu sync.RWMutex
cacheList *list.List
cacheMap TimedCacheMap
maxEntries int
mu sync.RWMutex
cacheList *list.List
cacheMap TimedCacheMap
maxEntries int
cacheLenMetric prometheus.Gauge
}

func (tc *TimedCache) GetCacheEntry(key string) (interface{}, bool) {
Expand Down Expand Up @@ -90,6 +92,9 @@ func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) (*cacheEnt
// place at end of list
cEntry.e = tc.cacheList.PushBack(cEntry)
tc.cacheMap[key] = cEntry
if tc.cacheLenMetric != nil {
tc.cacheLenMetric.Inc()
}
}
return cEntry, true
}
Expand Down Expand Up @@ -140,14 +145,18 @@ func (tc *TimedCache) CleanupExpiredEntries(expiry time.Duration, callback Cache
callback(pCacheInfo.SourceEntry)
delete(tc.cacheMap, pCacheInfo.key)
tc.cacheList.Remove(listEntry)
if tc.cacheLenMetric != nil {
tc.cacheLenMetric.Dec()
}
}
}

func NewTimedCache(maxEntries int) *TimedCache {
func NewTimedCache(maxEntries int, cacheLenMetric prometheus.Gauge) *TimedCache {
l := &TimedCache{
cacheList: list.New(),
cacheMap: make(TimedCacheMap),
maxEntries: maxEntries,
cacheList: list.New(),
cacheMap: make(TimedCacheMap),
maxEntries: maxEntries,
cacheLenMetric: cacheLenMetric,
}
return l
}
Expand Down