Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StatsD plugin: Support multiple fields for timers #737

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 43 additions & 28 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)

const UDP_PACKET_SIZE int = 1500
const (
UDP_PACKET_SIZE int = 1500

defaultFieldName = "value"
)

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
Expand Down Expand Up @@ -113,9 +117,9 @@ type cachedcounter struct {
}

type cachedtimings struct {
name string
stats RunningStats
tags map[string]string
name string
fields map[string]RunningStats
tags map[string]string
}

func (_ *Statsd) Description() string {
Expand Down Expand Up @@ -169,16 +173,26 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
now := time.Now()

for _, metric := range s.timings {
// Defining a template to parse field names for timers allows us to split
// out multiple fields per timer. In this case we prefix each stat with the
// field name and store these all in a single measurement.
fields := make(map[string]interface{})
fields["mean"] = metric.stats.Mean()
fields["stddev"] = metric.stats.Stddev()
fields["upper"] = metric.stats.Upper()
fields["lower"] = metric.stats.Lower()
fields["count"] = metric.stats.Count()
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%v_percentile", percentile)
fields[name] = metric.stats.Percentile(percentile)
for fieldName, stats := range metric.fields {
var prefix string
if fieldName != defaultFieldName {
prefix = fieldName + "_"
}
fields[prefix+"mean"] = stats.Mean()
fields[prefix+"stddev"] = stats.Stddev()
fields[prefix+"upper"] = stats.Upper()
fields[prefix+"lower"] = stats.Lower()
fields[prefix+"count"] = stats.Count()
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
fields[name] = stats.Percentile(percentile)
}
}

acc.AddFields(metric.name, fields, metric.tags, now)
}
if s.DeleteTimings {
Expand Down Expand Up @@ -370,11 +384,6 @@ func (s *Statsd) parseStatsdLine(line string) error {

// Parse the name & tags from bucket
m.name, m.field, m.tags = s.parseName(m.bucket)
// fields are not supported for timings, so if specified combine into
// the name
if (m.mtype == "ms" || m.mtype == "h") && m.field != "value" {
m.name += "_" + m.field
}
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
Expand Down Expand Up @@ -433,7 +442,7 @@ func (s *Statsd) parseName(bucket string) (string, string, map[string]string) {
name = strings.Replace(name, "-", "__", -1)
}
if field == "" {
field = "value"
field = defaultFieldName
}

return name, field, tags
Expand Down Expand Up @@ -461,26 +470,32 @@ func parseKeyValue(keyvalue string) (string, string) {
func (s *Statsd) aggregate(m metric) {
switch m.mtype {
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
tags: m.tags,
stats: RunningStats{
PercLimit: s.PercentileLimit,
},
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
}
}
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
}
}

if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
cached.stats.AddValue(m.floatvalue)
field.AddValue(m.floatvalue)
}
s.timings[m.hash] = cached
} else {
cached.stats.AddValue(m.floatvalue)
s.timings[m.hash] = cached
field.AddValue(m.floatvalue)
}
cached.fields[m.field] = field
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
_, ok := s.counters[m.hash]
Expand Down
106 changes: 102 additions & 4 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,12 @@ func TestParse_MeasurementsWithMultipleValues(t *testing.T) {
// A 0 with invalid samplerate will add a single 0,
// plus the last bit of value 1
// which adds up to 12 individual datapoints to be cached
if cachedtiming.stats.n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.stats.n)
if cachedtiming.fields[defaultFieldName].n != 12 {
t.Errorf("Expected 11 additions, got %d", cachedtiming.fields[defaultFieldName].n)
}

if cachedtiming.stats.upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.stats.upper)
if cachedtiming.fields[defaultFieldName].upper != 1 {
t.Errorf("Expected max input to be 1, got %f", cachedtiming.fields[defaultFieldName].upper)
}
}

Expand Down Expand Up @@ -842,7 +842,105 @@ func TestParse_Timings(t *testing.T) {
}

acc.AssertContainsFields(t, "test_timing", valid)
}

// Tests low-level functionality of timings when multiple fields is enabled
// and a measurement template has been defined which can parse field names
func TestParse_Timings_MultipleFieldsWithTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{"measurement.field"}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}

validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}

for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)

valid := map[string]interface{}{
"success_90_percentile": float64(11),
"success_count": int64(5),
"success_lower": float64(1),
"success_mean": float64(3),
"success_stddev": float64(4),
"success_upper": float64(11),

"error_90_percentile": float64(22),
"error_count": int64(5),
"error_lower": float64(2),
"error_mean": float64(6),
"error_stddev": float64(8),
"error_upper": float64(22),
}

acc.AssertContainsFields(t, "test_timing", valid)
}

// Tests low-level functionality of timings when multiple fields is enabled
// but a measurement template hasn't been defined so we can't parse field names
// In this case the behaviour should be the same as normal behaviour
func TestParse_Timings_MultipleFieldsWithoutTemplate(t *testing.T) {
s := NewStatsd()
s.Templates = []string{}
s.Percentiles = []int{90}
acc := &testutil.Accumulator{}

validLines := []string{
"test_timing.success:1|ms",
"test_timing.success:11|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.success:1|ms",
"test_timing.error:2|ms",
"test_timing.error:22|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
"test_timing.error:2|ms",
}

for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)

expectedSuccess := map[string]interface{}{
"90_percentile": float64(11),
"count": int64(5),
"lower": float64(1),
"mean": float64(3),
"stddev": float64(4),
"upper": float64(11),
}
expectedError := map[string]interface{}{
"90_percentile": float64(22),
"count": int64(5),
"lower": float64(2),
"mean": float64(6),
"stddev": float64(8),
"upper": float64(22),
}

acc.AssertContainsFields(t, "test_timing_success", expectedSuccess)
acc.AssertContainsFields(t, "test_timing_error", expectedError)
}

func TestParse_Timings_Delete(t *testing.T) {
Expand Down