Skip to content

Commit

Permalink
Unregister metrics generated from logs after a config reload.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Feb 26, 2024
1 parent 6578a00 commit a4e8ae0
Show file tree
Hide file tree
Showing 17 changed files with 117 additions and 3 deletions.
6 changes: 6 additions & 0 deletions clients/pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (c *cri) Name() string {
return "cri"
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions clients/pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
metrics[name] = collector
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: *cfgs,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -142,6 +142,19 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
for name, collector := range m.metrics {
Expand Down Expand Up @@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
// nolint:goconst
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
Expand Down
9 changes: 8 additions & 1 deletion clients/pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down Expand Up @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
}

var (
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type Pipeline struct {
dropCount *prometheus.CounterVec
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
Expand Down Expand Up @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
}
return creator(params)
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/structuredmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down

0 comments on commit a4e8ae0

Please sign in to comment.