Skip to content

Commit

Permalink
Refactoring gauges to support floats, unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 6, 2015
1 parent 1d7cde5 commit e68b6dd
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 123 deletions.
14 changes: 7 additions & 7 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
mu sync.Mutex
sync.Mutex

client.BatchPoints

Expand All @@ -30,8 +30,8 @@ func (bp *BatchPoints) Add(
val interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.Lock()
defer bp.Unlock()

measurement = bp.Prefix + measurement

Expand Down Expand Up @@ -72,8 +72,8 @@ func (bp *BatchPoints) AddFieldsWithTime(
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.mu.Lock()
// defer bp.mu.Unlock()
// bp.Lock()
// defer bp.Unlock()

// measurement = bp.Prefix + measurement

Expand Down Expand Up @@ -117,8 +117,8 @@ func (bp *BatchPoints) AddFields(
fields map[string]interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()
bp.Lock()
defer bp.Unlock()

measurement = bp.Prefix + measurement

Expand Down
94 changes: 49 additions & 45 deletions plugins/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type Statsd struct {
done chan struct{}

// Cache gauges, counters & sets so they can be aggregated as they arrive
gauges map[string]cachedmetric
counters map[string]cachedmetric
sets map[string]cachedmetric
gauges map[string]cachedgauge
counters map[string]cachedcounter
sets map[string]cachedset

Mappings []struct {
Match string
Expand All @@ -52,9 +52,9 @@ func NewStatsd() *Statsd {
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric)
s.counters = make(map[string]cachedmetric)
s.sets = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)

return &s
}
Expand All @@ -63,19 +63,31 @@ func NewStatsd() *Statsd {
type metric struct {
name string
bucket string
value int64
value float64
mtype string
additive bool
samplerate float64
tags map[string]string
}

// cachedmetric is a subset of metric used specifically for storing cached
// gauges and counters, ready for sending to InfluxDB.
type cachedmetric struct {
type cachedset struct {
set map[int64]bool
tags map[string]string
}

type cachedgauge struct {
value float64
tags map[string]string
}

type cachedcounter struct {
value int64
tags map[string]string
set map[int64]bool
}

type cachedtiming struct {
timings []float64
tags map[string]string
}

func (_ *Statsd) Description() string {
Expand Down Expand Up @@ -105,7 +117,6 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
s.Lock()
defer s.Unlock()

values := make(map[string]int64)
items := len(s.inmetrics)
for i := 0; i < items; i++ {

Expand All @@ -123,26 +134,23 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
}

for name, cmetric := range s.counters {
acc.Add(name, cmetric.value, cmetric.tags)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedmetric)
s.counters = make(map[string]cachedcounter)
}

for name, cmetric := range s.sets {
acc.Add(name, cmetric.value, cmetric.tags)
acc.Add(name, int64(len(cmetric.set)), cmetric.tags)
}
if s.DeleteSets {
s.sets = make(map[string]cachedmetric)
s.sets = make(map[string]cachedset)
}

for name, value := range values {
acc.Add(name, value, nil)
}
return nil
}

Expand All @@ -153,9 +161,9 @@ func (s *Statsd) Start() error {
s.done = make(chan struct{})
s.in = make(chan string, s.AllowedPendingMessages)
s.inmetrics = make(chan metric, s.AllowedPendingMessages)
s.gauges = make(map[string]cachedmetric)
s.counters = make(map[string]cachedmetric)
s.sets = make(map[string]cachedmetric)
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)

// Start the UDP listener
go s.udpListen()
Expand Down Expand Up @@ -267,14 +275,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
}
m.additive = true
}
v, err := strconv.ParseInt(parts2[1], 10, 64)
v, err := strconv.ParseFloat(parts2[1], 64)
if err != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
log.Printf("Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
// If a sample rate is given with a counter, divide value by the rate
if m.samplerate != 0 && m.mtype == "c" {
v = int64(float64(v) / m.samplerate)
v = v / m.samplerate
}
m.value = v

Expand All @@ -301,7 +309,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// map of tags.
// Return values are (<name>, <tags>)
func (s *Statsd) parseName(m metric) (string, map[string]string) {
var tags map[string]string
tags := make(map[string]string)
name := strings.Replace(m.bucket, ".", "_", -1)
name = strings.Replace(name, "-", "__", -1)

Expand All @@ -325,13 +333,13 @@ func (s *Statsd) parseName(m metric) (string, map[string]string) {

switch m.mtype {
case "c":
name = name + "_counter"
tags["metric_type"] = "counter"
case "g":
name = name + "_gauge"
tags["metric_type"] = "gauge"
case "s":
name = name + "_set"
tags["metric_type"] = "set"
case "ms", "h":
name = name + "_timer"
tags["metric_type"] = "timer"
}

return name, tags
Expand Down Expand Up @@ -361,21 +369,22 @@ func bucketglob(pattern, bucket string) bool {
func (s *Statsd) aggregate(m metric) {
switch m.mtype {
case "c":
v := int64(m.value)
cached, ok := s.counters[m.name]
if !ok {
s.counters[m.name] = cachedmetric{
value: m.value,
s.counters[m.name] = cachedcounter{
value: v,
tags: m.tags,
}
} else {
cached.value += m.value
cached.value += v
cached.tags = m.tags
s.counters[m.name] = cached
}
case "g":
cached, ok := s.gauges[m.name]
if !ok {
s.gauges[m.name] = cachedmetric{
s.gauges[m.name] = cachedgauge{
value: m.value,
tags: m.tags,
}
Expand All @@ -389,22 +398,17 @@ func (s *Statsd) aggregate(m metric) {
s.gauges[m.name] = cached
}
case "s":
v := int64(m.value)
cached, ok := s.sets[m.name]
if !ok {
// Completely new metric (initialize with count of 1)
s.sets[m.name] = cachedmetric{
value: 1,
tags: m.tags,
set: map[int64]bool{m.value: true},
s.sets[m.name] = cachedset{
tags: m.tags,
set: map[int64]bool{v: true},
}
} else {
_, ok := s.sets[m.name].set[m.value]
if !ok {
// Metric exists, but value has not been counted
cached.value += 1
cached.set[m.value] = true
s.sets[m.name] = cached
}
cached.set[v] = true
s.sets[m.name] = cached
}
}
}
Expand Down
Loading

0 comments on commit e68b6dd

Please sign in to comment.