diff --git a/clients/pkg/logentry/metric/metricvec.go b/clients/pkg/logentry/metric/metricvec.go index f004db760f8f..cffba8499d8c 100644 --- a/clients/pkg/logentry/metric/metricvec.go +++ b/clients/pkg/logentry/metric/metricvec.go @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool { return ok } +func (c *metricVec) DeleteAll() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.metrics = map[model.Fingerprint]prometheus.Metric{} +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/clients/pkg/logentry/stages/decolorize.go b/clients/pkg/logentry/stages/decolorize.go index a86e6cdeafb2..0603c09660ef 100644 --- a/clients/pkg/logentry/stages/decolorize.go +++ b/clients/pkg/logentry/stages/decolorize.go @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry { func (m *decolorizeStage) Name() string { return StageTypeDecolorize } + +// Cleanup implements Stage. +func (*decolorizeStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/drop.go b/clients/pkg/logentry/stages/drop.go index 462d6c34f635..4f4d8572bcac 100644 --- a/clients/pkg/logentry/stages/drop.go +++ b/clients/pkg/logentry/stages/drop.go @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool { func (m *dropStage) Name() string { return StageTypeDrop } + +// Cleanup implements Stage. +func (*dropStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/eventlogmessage.go b/clients/pkg/logentry/stages/eventlogmessage.go index e637c5c92098..bf591a3f5325 100644 --- a/clients/pkg/logentry/stages/eventlogmessage.go +++ b/clients/pkg/logentry/stages/eventlogmessage.go @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string { return StageTypeEventLogMessage } +// Cleanup implements Stage. +func (*eventLogMessageStage) Cleanup() { + // no-op +} + // Sanitize a input string to convert it into a valid prometheus label // TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { diff --git a/clients/pkg/logentry/stages/extensions.go b/clients/pkg/logentry/stages/extensions.go index 2e49d6bd224b..c7ebdd18fe25 100644 --- a/clients/pkg/logentry/stages/extensions.go +++ b/clients/pkg/logentry/stages/extensions.go @@ -59,6 +59,11 @@ func (c *cri) Name() string { return "cri" } +// Cleanup implements Stage. +func (*cri) Cleanup() { + // no-op +} + // implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) diff --git a/clients/pkg/logentry/stages/geoip.go b/clients/pkg/logentry/stages/geoip.go index d127ecf89814..0786f584d96c 100644 --- a/clients/pkg/logentry/stages/geoip.go +++ b/clients/pkg/logentry/stages/geoip.go @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string { return StageTypeGeoIP } +// Cleanup implements Stage. +func (*geoIPStage) Cleanup() { + // no-op +} + func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) { var ip net.IP if g.cfgs.Source != nil { diff --git a/clients/pkg/logentry/stages/json.go b/clients/pkg/logentry/stages/json.go index 36f8f66c0358..de32bedff2bc 100644 --- a/clients/pkg/logentry/stages/json.go +++ b/clients/pkg/logentry/stages/json.go @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string func (j *jsonStage) Name() string { return StageTypeJSON } + +// Cleanup implements Stage. +func (*jsonStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/limit.go b/clients/pkg/logentry/stages/limit.go index 49d32cbf0402..5d7553992ff8 100644 --- a/clients/pkg/logentry/stages/limit.go +++ b/clients/pkg/logentry/stages/limit.go @@ -138,6 +138,11 @@ func (m *limitStage) Name() string { return StageTypeLimit } +// Cleanup implements Stage. +func (*limitStage) Cleanup() { + // no-op +} + func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec { return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total", "A count of all log lines dropped as a result of a pipeline stage", diff --git a/clients/pkg/logentry/stages/match.go b/clients/pkg/logentry/stages/match.go index 4007e45da4ec..176f383c4178 100644 --- a/clients/pkg/logentry/stages/match.go +++ b/clients/pkg/logentry/stages/match.go @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { func (m *matcherStage) Name() string { return StageTypeMatch } + +// Cleanup implements Stage. +func (*matcherStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/metrics.go b/clients/pkg/logentry/stages/metrics.go index 827f0cf313a4..2497a694fe23 100644 --- a/clients/pkg/logentry/stages/metrics.go +++ b/clients/pkg/logentry/stages/metrics.go @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R metrics[name] = collector } } - return toStage(&metricStage{ + return &metricStage{ logger: logger, cfg: *cfgs, metrics: metrics, - }), nil + }, nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -142,6 +142,19 @@ type metricStage struct { metrics map[string]prometheus.Collector } +func (m *metricStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + for e := range in { + m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + out <- e + } + }() + return out +} + // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) { for name, collector := range m.metrics { @@ -178,6 +191,20 @@ func (m *metricStage) Name() string { return StageTypeMetric } +// Cleanup implements Stage. +func (m *metricStage) Cleanup() { + for _, collector := range m.metrics { + switch vec := collector.(type) { + case *metric.Counters: + vec.DeleteAll() + case *metric.Gauges: + vec.DeleteAll() + case *metric.Histograms: + vec.DeleteAll() + } + } +} + // recordCounter will update a counter metric // nolint:goconst func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { diff --git a/clients/pkg/logentry/stages/metrics_test.go b/clients/pkg/logentry/stages/metrics_test.go index f46ea6839919..563ee9eab664 100644 --- a/clients/pkg/logentry/stages/metrics_test.go +++ b/clients/pkg/logentry/stages/metrics_test.go @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) { strings.NewReader(expectedMetrics)); err != nil { t.Fatalf("mismatch metrics: %v", err) } + + pl.Cleanup() + + if err := testutil.GatherAndCompare(registry, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } } func TestNegativeGauge(t *testing.T) { @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) { if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec) + assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec) } var ( diff --git a/clients/pkg/logentry/stages/multiline.go b/clients/pkg/logentry/stages/multiline.go index 2f94a2e1822f..f44e4a51b108 100644 --- a/clients/pkg/logentry/stages/multiline.go +++ b/clients/pkg/logentry/stages/multiline.go @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { func (m *multilineStage) Name() string { return StageTypeMultiline } + +// Cleanup implements Stage. +func (*multilineStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/pack.go b/clients/pkg/logentry/stages/pack.go index 881650d8c6aa..afeecde6271c 100644 --- a/clients/pkg/logentry/stages/pack.go +++ b/clients/pkg/logentry/stages/pack.go @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry { func (m *packStage) Name() string { return StageTypePack } + +// Cleanup implements Stage. +func (*packStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/pipeline.go b/clients/pkg/logentry/stages/pipeline.go index 1c4d2ba8e5ab..288ea5190f48 100644 --- a/clients/pkg/logentry/stages/pipeline.go +++ b/clients/pkg/logentry/stages/pipeline.go @@ -30,6 +30,13 @@ type Pipeline struct { dropCount *prometheus.CounterVec } +// Cleanup implements Stage. +func (p *Pipeline) Cleanup() { + for _, s := range p.stages { + s.Cleanup() + } +} + // NewPipeline creates a new log entry pipeline from a configuration func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) { st := []Stage{} @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { return api.NewEntryHandler(handlerIn, func() { once.Do(func() { close(handlerIn) }) wg.Wait() + p.Cleanup() }) } diff --git a/clients/pkg/logentry/stages/sampling.go b/clients/pkg/logentry/stages/sampling.go index 73d4b2540296..6340ec6de3cd 100644 --- a/clients/pkg/logentry/stages/sampling.go +++ b/clients/pkg/logentry/stages/sampling.go @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 { func (m *samplingStage) Name() string { return StageTypeSampling } + +// Cleanup implements Stage. +func (*samplingStage) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/stage.go b/clients/pkg/logentry/stages/stage.go index 9de1d4e0a590..82e4e508d823 100644 --- a/clients/pkg/logentry/stages/stage.go +++ b/clients/pkg/logentry/stages/stage.go @@ -62,6 +62,7 @@ type Entry struct { type Stage interface { Name() string Run(chan Entry) chan Entry + Cleanup() } func (entry *Entry) copy() *Entry { @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string, } return creator(params) } + +// Cleanup implements Stage. +func (*stageProcessor) Cleanup() { + // no-op +} diff --git a/clients/pkg/logentry/stages/structuredmetadata.go b/clients/pkg/logentry/stages/structuredmetadata.go index cdf70c01d4fa..857959b8d381 100644 --- a/clients/pkg/logentry/stages/structuredmetadata.go +++ b/clients/pkg/logentry/stages/structuredmetadata.go @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string { return StageTypeStructuredMetadata } +// Cleanup implements Stage. +func (*structuredMetadataStage) Cleanup() { + // no-op +} + func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { diff --git a/docs/sources/send-data/promtail/configuration.md b/docs/sources/send-data/promtail/configuration.md index ce1e329c7ea0..1f8265065328 100644 --- a/docs/sources/send-data/promtail/configuration.md +++ b/docs/sources/send-data/promtail/configuration.md @@ -681,7 +681,8 @@ The metrics stage allows for defining metrics from the extracted data. Created metrics are not pushed to Loki and are instead exposed via Promtail's `/metrics` endpoint. Prometheus should be configured to scrape Promtail to be -able to retrieve the metrics configured by this stage. +able to retrieve the metrics configured by this stage. +If Promtail's configuration is reloaded, all metrics will be reset. ```yaml diff --git a/docs/sources/send-data/promtail/stages/metrics.md b/docs/sources/send-data/promtail/stages/metrics.md index 055f7e076e7a..b034bd6d6d6a 100644 --- a/docs/sources/send-data/promtail/stages/metrics.md +++ b/docs/sources/send-data/promtail/stages/metrics.md @@ -13,7 +13,8 @@ The `metrics` stage is an action stage that allows for defining and updating metrics based on data from the extracted map. Note that created metrics are not pushed to Loki and are instead exposed via Promtail's `/metrics` endpoint. Prometheus should be configured to scrape Promtail to be able to retrieve the -metrics configured by this stage. +metrics configured by this stage. If Promtail's configuration is reloaded, +all metrics will be reset. ## Schema