Skip to content

Commit

Permalink
Make sure we don't send unsampled metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Mar 18, 2020
1 parent adf7f92 commit c4733ba
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
34 changes: 23 additions & 11 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,36 @@ func (a *aggregator) stop() {
func (a *aggregator) flushMetrics() []metric {
metrics := []metric{}

a.setsM.RLock()
for _, s := range a.sets {
metrics = append(metrics, s.flush()...)
// We reset the values to avoid sending 'zero' values for metrics not
// sampled during this flush insterval

a.setsM.Lock()
sets := a.sets
a.sets = map[string]*setMetric{}
a.setsM.Unlock()

for _, s := range sets {
metrics = append(metrics, s.flushUnsafe()...)
}
a.setsM.RUnlock()

a.gaugesM.RLock()
for _, g := range a.gauges {
metrics = append(metrics, g.flush())
a.gaugesM.Lock()
gauges := a.gauges
a.gauges = map[string]*gaugeMetric{}
a.gaugesM.Unlock()

for _, g := range gauges {
metrics = append(metrics, g.flushUnsafe())
}
a.gaugesM.RUnlock()

a.countsM.RLock()
for _, c := range a.counts {
metrics = append(metrics, c.flush())
}
counts := a.gauges
a.counts = map[string]*countMetric{}
a.countsM.RUnlock()

for _, c := range counts {
metrics = append(metrics, c.flushUnsafe())
}

return metrics
}

Expand Down
19 changes: 6 additions & 13 deletions statsd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (c *countMetric) sample(v int64) {
atomic.AddInt64(&c.value, v)
}

func (c *countMetric) flush() metric {
func (c *countMetric) flushUnsafe() metric {
return metric{
metricType: count,
name: c.name,
tags: c.tags,
rate: c.rate,
ivalue: atomic.SwapInt64(&c.value, 0),
ivalue: c.value,
}
}

Expand All @@ -65,14 +65,13 @@ func (g *gaugeMetric) sample(v float64) {
atomic.StoreUint64(&g.value, math.Float64bits(v))
}

func (g *gaugeMetric) flush() metric {
value := atomic.SwapUint64(&g.value, math.Float64bits(0))
func (g *gaugeMetric) flushUnsafe() metric {
return metric{
metricType: count,
name: g.name,
tags: g.tags,
rate: g.rate,
fvalue: math.Float64frombits(value),
fvalue: math.Float64frombits(g.value),
}
}

Expand Down Expand Up @@ -105,19 +104,13 @@ func (s *setMetric) sample(v string) {

// Sets are aggregated on the agent side too. We flush the keys so a set from
// multiple application can be correctly aggregated on the agnet side.
func (s *setMetric) flush() []metric {
s.Lock()
func (s *setMetric) flushUnsafe() []metric {
if len(s.data) == 0 {
s.Unlock()
return nil
}

values := s.data
s.data = map[string]struct{}{}
s.Unlock()

metrics := []metric{}
for value := range values {
for value := range s.data {
metrics = append(metrics,
metric{
metricType: set,
Expand Down

0 comments on commit c4733ba

Please sign in to comment.