diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 7b3c10f9d..0bd090b38 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -59,24 +59,36 @@ func (a *aggregator) stop() { func (a *aggregator) flushMetrics() []metric { metrics := []metric{} - a.setsM.RLock() - for _, s := range a.sets { - metrics = append(metrics, s.flush()...) + // We reset the values to avoid sending 'zero' values for metrics not + // sampled during this flush insterval + + a.setsM.Lock() + sets := a.sets + a.sets = map[string]*setMetric{} + a.setsM.Unlock() + + for _, s := range sets { + metrics = append(metrics, s.flushUnsafe()...) } - a.setsM.RUnlock() - a.gaugesM.RLock() - for _, g := range a.gauges { - metrics = append(metrics, g.flush()) + a.gaugesM.Lock() + gauges := a.gauges + a.gauges = map[string]*gaugeMetric{} + a.gaugesM.Unlock() + + for _, g := range gauges { + metrics = append(metrics, g.flushUnsafe()) } - a.gaugesM.RUnlock() a.countsM.RLock() - for _, c := range a.counts { - metrics = append(metrics, c.flush()) - } + counts := a.gauges + a.counts = map[string]*countMetric{} a.countsM.RUnlock() + for _, c := range counts { + metrics = append(metrics, c.flushUnsafe()) + } + return metrics } diff --git a/statsd/metrics.go b/statsd/metrics.go index 40755ab0b..b441df039 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -33,13 +33,13 @@ func (c *countMetric) sample(v int64) { atomic.AddInt64(&c.value, v) } -func (c *countMetric) flush() metric { +func (c *countMetric) flushUnsafe() metric { return metric{ metricType: count, name: c.name, tags: c.tags, rate: c.rate, - ivalue: atomic.SwapInt64(&c.value, 0), + ivalue: c.value, } } @@ -65,14 +65,13 @@ func (g *gaugeMetric) sample(v float64) { atomic.StoreUint64(&g.value, math.Float64bits(v)) } -func (g *gaugeMetric) flush() metric { - value := atomic.SwapUint64(&g.value, math.Float64bits(0)) +func (g *gaugeMetric) flushUnsafe() metric { return metric{ metricType: count, name: g.name, tags: g.tags, rate: g.rate, - fvalue: math.Float64frombits(value), + fvalue: math.Float64frombits(g.value), } } @@ -105,19 +104,13 @@ func (s *setMetric) sample(v string) { // Sets are aggregated on the agent side too. We flush the keys so a set from // multiple application can be correctly aggregated on the agnet side. -func (s *setMetric) flush() []metric { - s.Lock() +func (s *setMetric) flushUnsafe() []metric { if len(s.data) == 0 { - s.Unlock() return nil } - values := s.data - s.data = map[string]struct{}{} - s.Unlock() - metrics := []metric{} - for value := range values { + for value := range s.data { metrics = append(metrics, metric{ metricType: set,