From a4e8ae06ad534d3d68e455d63964ae8225d824ed Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 6 Feb 2024 18:53:54 +0000 Subject: [PATCH] Unregister metrics generated from logs after a config reload. --- clients/pkg/logentry/metric/metricvec.go | 6 ++++ clients/pkg/logentry/stages/decolorize.go | 5 +++ clients/pkg/logentry/stages/drop.go | 5 +++ .../pkg/logentry/stages/eventlogmessage.go | 5 +++ clients/pkg/logentry/stages/extensions.go | 5 +++ clients/pkg/logentry/stages/geoip.go | 5 +++ clients/pkg/logentry/stages/json.go | 5 +++ clients/pkg/logentry/stages/limit.go | 5 +++ clients/pkg/logentry/stages/match.go | 5 +++ clients/pkg/logentry/stages/metrics.go | 31 +++++++++++++++++-- clients/pkg/logentry/stages/metrics_test.go | 9 +++++- clients/pkg/logentry/stages/multiline.go | 5 +++ clients/pkg/logentry/stages/pack.go | 5 +++ clients/pkg/logentry/stages/pipeline.go | 8 +++++ clients/pkg/logentry/stages/sampling.go | 5 +++ clients/pkg/logentry/stages/stage.go | 6 ++++ .../pkg/logentry/stages/structuredmetadata.go | 5 +++ 17 files changed, 117 insertions(+), 3 deletions(-) diff --git a/clients/pkg/logentry/metric/metricvec.go b/clients/pkg/logentry/metric/metricvec.go index 07f73c20873d..666e3712eb7a 100644 --- a/clients/pkg/logentry/metric/metricvec.go +++ b/clients/pkg/logentry/metric/metricvec.go @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool { return ok } +func (c *metricVec) DeleteAll() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.metrics = map[model.Fingerprint]prometheus.Metric{} +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/clients/pkg/logentry/stages/decolorize.go b/clients/pkg/logentry/stages/decolorize.go index bac7274b6bad..26e4f8b53a0d 100644 --- a/clients/pkg/logentry/stages/decolorize.go +++ b/clients/pkg/logentry/stages/decolorize.go @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry { func (m *decolorizeStage) Name() string { return StageTypeDecolorize } + +// Cleanup implements Stage. +func (*decolorizeStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/drop.go b/clients/pkg/logentry/stages/drop.go index 19a2e6c37807..1a19ca294b65 100644 --- a/clients/pkg/logentry/stages/drop.go +++ b/clients/pkg/logentry/stages/drop.go @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool { func (m *dropStage) Name() string { return StageTypeDrop } + +// Cleanup implements Stage. +func (*dropStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/eventlogmessage.go b/clients/pkg/logentry/stages/eventlogmessage.go index e637c5c92098..bf591a3f5325 100644 --- a/clients/pkg/logentry/stages/eventlogmessage.go +++ b/clients/pkg/logentry/stages/eventlogmessage.go @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string { return StageTypeEventLogMessage } +// Cleanup implements Stage. +func (*eventLogMessageStage) Cleanup() { + // no-op +} + // Sanitize a input string to convert it into a valid prometheus label // TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { diff --git a/clients/pkg/logentry/stages/extensions.go b/clients/pkg/logentry/stages/extensions.go index f25ffe02e840..236cae722529 100644 --- a/clients/pkg/logentry/stages/extensions.go +++ b/clients/pkg/logentry/stages/extensions.go @@ -59,6 +59,11 @@ func (c *cri) Name() string { return "cri" } +// Cleanup implements Stage. +func (*cri) Cleanup() { + // no-op +} + // implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) diff --git a/clients/pkg/logentry/stages/geoip.go b/clients/pkg/logentry/stages/geoip.go index d127ecf89814..0786f584d96c 100644 --- a/clients/pkg/logentry/stages/geoip.go +++ b/clients/pkg/logentry/stages/geoip.go @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string { return StageTypeGeoIP } +// Cleanup implements Stage. +func (*geoIPStage) Cleanup() { + // no-op +} + func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) { var ip net.IP if g.cfgs.Source != nil { diff --git a/clients/pkg/logentry/stages/json.go b/clients/pkg/logentry/stages/json.go index 36f8f66c0358..de32bedff2bc 100644 --- a/clients/pkg/logentry/stages/json.go +++ b/clients/pkg/logentry/stages/json.go @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string func (j *jsonStage) Name() string { return StageTypeJSON } + +// Cleanup implements Stage. +func (*jsonStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/limit.go b/clients/pkg/logentry/stages/limit.go index d5489221e6ac..7538bd316c50 100644 --- a/clients/pkg/logentry/stages/limit.go +++ b/clients/pkg/logentry/stages/limit.go @@ -138,6 +138,11 @@ func (m *limitStage) Name() string { return StageTypeLimit } +// Cleanup implements Stage. +func (*limitStage) Cleanup() { + // no-op +} + func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec { return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total", "A count of all log lines dropped as a result of a pipeline stage", diff --git a/clients/pkg/logentry/stages/match.go b/clients/pkg/logentry/stages/match.go index 3b4addbb0de1..2e547fdfcd7c 100644 --- a/clients/pkg/logentry/stages/match.go +++ b/clients/pkg/logentry/stages/match.go @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { func (m *matcherStage) Name() string { return StageTypeMatch } + +// Cleanup implements Stage. +func (*matcherStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/metrics.go b/clients/pkg/logentry/stages/metrics.go index 14386e3b43a4..3aee4cb68a95 100644 --- a/clients/pkg/logentry/stages/metrics.go +++ b/clients/pkg/logentry/stages/metrics.go @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R metrics[name] = collector } } - return toStage(&metricStage{ + return &metricStage{ logger: logger, cfg: *cfgs, metrics: metrics, - }), nil + }, nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -142,6 +142,19 @@ type metricStage struct { metrics map[string]prometheus.Collector } +func (m *metricStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + for e := range in { + m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + out <- e + } + }() + return out +} + // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) { for name, collector := range m.metrics { @@ -178,6 +191,20 @@ func (m *metricStage) Name() string { return StageTypeMetric } +// Cleanup implements Stage. +func (m *metricStage) Cleanup() { + for _, collector := range m.metrics { + switch vec := collector.(type) { + case *metric.Counters: + vec.DeleteAll() + case *metric.Gauges: + vec.DeleteAll() + case *metric.Histograms: + vec.DeleteAll() + } + } +} + // recordCounter will update a counter metric // nolint:goconst func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { diff --git a/clients/pkg/logentry/stages/metrics_test.go b/clients/pkg/logentry/stages/metrics_test.go index 6a14e6c80c1e..4f5fec89e7f3 100644 --- a/clients/pkg/logentry/stages/metrics_test.go +++ b/clients/pkg/logentry/stages/metrics_test.go @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) { strings.NewReader(expectedMetrics)); err != nil { t.Fatalf("mismatch metrics: %v", err) } + + pl.Cleanup() + + if err := testutil.GatherAndCompare(registry, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } } func TestNegativeGauge(t *testing.T) { @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) { if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec) + assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec) } var ( diff --git a/clients/pkg/logentry/stages/multiline.go b/clients/pkg/logentry/stages/multiline.go index 199ff438a939..9b6ff9bb790a 100644 --- a/clients/pkg/logentry/stages/multiline.go +++ b/clients/pkg/logentry/stages/multiline.go @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { func (m *multilineStage) Name() string { return StageTypeMultiline } + +// Cleanup implements Stage. +func (*multilineStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/pack.go b/clients/pkg/logentry/stages/pack.go index 737fa8d36b79..e06e86dd7fc3 100644 --- a/clients/pkg/logentry/stages/pack.go +++ b/clients/pkg/logentry/stages/pack.go @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry { func (m *packStage) Name() string { return StageTypePack } + +// Cleanup implements Stage. +func (*packStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/pipeline.go b/clients/pkg/logentry/stages/pipeline.go index c20a7784c511..fbc5b653d725 100644 --- a/clients/pkg/logentry/stages/pipeline.go +++ b/clients/pkg/logentry/stages/pipeline.go @@ -30,6 +30,13 @@ type Pipeline struct { dropCount *prometheus.CounterVec } +// Cleanup implements Stage. +func (p *Pipeline) Cleanup() { + for _, s := range p.stages { + s.Cleanup() + } +} + // NewPipeline creates a new log entry pipeline from a configuration func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) { st := []Stage{} @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { return api.NewEntryHandler(handlerIn, func() { once.Do(func() { close(handlerIn) }) wg.Wait() + p.Cleanup() }) } diff --git a/clients/pkg/logentry/stages/sampling.go b/clients/pkg/logentry/stages/sampling.go index 73d4b2540296..6340ec6de3cd 100644 --- a/clients/pkg/logentry/stages/sampling.go +++ b/clients/pkg/logentry/stages/sampling.go @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 { func (m *samplingStage) Name() string { return StageTypeSampling } + +// Cleanup implements Stage. +func (*samplingStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/stage.go b/clients/pkg/logentry/stages/stage.go index 1c19face4044..86415738469f 100644 --- a/clients/pkg/logentry/stages/stage.go +++ b/clients/pkg/logentry/stages/stage.go @@ -62,6 +62,7 @@ type Entry struct { type Stage interface { Name() string Run(chan Entry) chan Entry + Cleanup() } func (entry *Entry) copy() *Entry { @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string, } return creator(params) } + +// Cleanup implements Stage. +func (*stageProcessor) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/structuredmetadata.go b/clients/pkg/logentry/stages/structuredmetadata.go index cdab88a956c7..e8222c1f49e9 100644 --- a/clients/pkg/logentry/stages/structuredmetadata.go +++ b/clients/pkg/logentry/stages/structuredmetadata.go @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string { return StageTypeStructuredMetadata } +// Cleanup implements Stage. +func (*structuredMetadataStage) Cleanup() { + // no-op +} + func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {