Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] [Statsd] Add support for Graphite series 1.1.0+ tags #39619

Merged
merged 14 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Metricbeat*

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Added a conditional check to see if the statsd metric contains ',' or ';' and split accordingly. {pull}39619[39619]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Added a conditional check to see if the statsd metric contains ',' or ';' and split accordingly. {pull}39619[39619]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we should update README. See an example: https://github.com/prometheus/statsd_exporter?tab=readme-ov-file#tagging-extensions

@ritalwar @tehbooom What do you think? Else, one has to go to code to figure it out to see what extensions are supported.

Copy link
Member Author

@tehbooom tehbooom May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ritalwar @shmsr How do we feel about adding this to the README?

[float]
=== Suported tag extensions

The `statsd` module supports the following tags:

https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol[DogStatsD]

`<metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>`

https://github.com/influxdata/telegraf/blob/master/plugins/inputs/statsd/README.md#influx-statsd[InfluxDB]

`<metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate`

https://graphite.readthedocs.io/en/latest/tags.html#graphite-tag-support[Graphite 1.1.x]

`<metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate`

Would look like this:

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the heading could be like:

  • "DogStatsD-style tags"
  • "InfluxDB-style tags"

and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with just listing the names, but we can add something like:

Supported Tag Extensions

Examples of tag styles supported by the statsd module:

  • DogStatsD
    <metric name>:<value>|<type>|@samplerate|#<k>:<v>,<k>:<v>
  • InfluxDB
    <metric name>,<k>=<v>,<k>=<v>:<value>|<type>|@samplerate
  • Graphite 1.1.x
    <metric name>;<k>=<v>;<k>=<v>:<value>|<type>|@samplerate


*Osquerybeat*

Expand Down
33 changes: 26 additions & 7 deletions x-pack/metricbeat/module/statsd/server/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,32 @@ type statsdMetric struct {

func splitTags(rawTags, kvSep []byte) map[string]string {
tags := map[string]string{}
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
if bytes.Contains(rawTags, []byte(",")) {
for _, kv := range bytes.Split(rawTags, []byte(",")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
}
} else {
for _, kv := range bytes.Split(rawTags, []byte(";")) {
kvSplit := bytes.SplitN(kv, kvSep, 2)
if len(kvSplit) != 2 {
logger.Warn("could not parse tags")
continue
}
tags[string(kvSplit[0])] = string(kvSplit[1])
tehbooom marked this conversation as resolved.
Show resolved Hide resolved
}
tags[string(kvSplit[0])] = string(kvSplit[1])
}
return tags
}

func parseSingle(b []byte) (statsdMetric, error) {
// format: <metric name>:<value>|<type>[|@samplerate][|#<k>:<v>,<k>:<v>]
// alternative: <metric name>[,<k>=<v>,<k>=<v>]:<value>|<type>[|@samplerate]
// alternative: <metric name>[;<k>=<v>;<k>=<v>]:<value>|<type>[|@samplerate]
s := statsdMetric{}

parts := bytes.SplitN(b, []byte("|"), 4)
Expand All @@ -73,7 +85,14 @@ func parseSingle(b []byte) (statsdMetric, error) {
return s, errInvalidPacket
}

nameTagsSplit := bytes.SplitN(nameSplit[0], []byte(","), 2)
var nameTagsSplit [][]byte

tehbooom marked this conversation as resolved.
Show resolved Hide resolved
if bytes.Contains(nameSplit[0], []byte(",")) {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(","), 2)
} else {
nameTagsSplit = bytes.SplitN(nameSplit[0], []byte(";"), 2)
tehbooom marked this conversation as resolved.
Show resolved Hide resolved
}

s.name = string(nameTagsSplit[0])
if len(nameTagsSplit) > 1 {
s.tags = splitTags(nameTagsSplit[1], []byte("="))
Expand Down
41 changes: 38 additions & 3 deletions x-pack/metricbeat/module/statsd/server/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,20 @@ func TestParseMetrics(t *testing.T) {
},
},
},
{ // Graphite 1.1.x Tags
input: "tags3;k1=v1;k2=v2:1|c",
expected: []statsdMetric{
{
name: "tags3",
metricType: "c",
value: "1",
tags: map[string]string{
"k1": "v1",
"k2": "v2",
},
},
},
},
/// errors
{
input: "meter1-1.4|m",
Expand Down Expand Up @@ -1064,6 +1078,17 @@ func TestParseSingle(t *testing.T) {
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: counter with Graphite tags": {
input: "tags2;k1=v1;k2=v2:1|c",
err: nil,
want: statsdMetric{
name: "tags2",
metricType: "c",
sampleRate: "",
value: "1",
tags: map[string]string{"k1": "v1", "k2": "v2"},
},
},
"valid packet: gauge": {
input: "gauge1:1.0|g",
err: nil,
Expand Down Expand Up @@ -1124,13 +1149,14 @@ func TestTagsGrouping(t *testing.T) {

"metric3:3|c|@0.1|#k1:v2,k2:v3",
"metric4:4|ms|#k1:v2,k2:v3",
"metric5;k1=v3;k2=v4:5|c",
}

err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 4)
assert.Len(t, events, 5)

actualTags := []mapstr.M{}
for _, e := range events {
Expand Down Expand Up @@ -1162,6 +1188,12 @@ func TestTagsGrouping(t *testing.T) {
"k2": "v3",
},
},
{
"labels": mapstr.M{
"k1": "v3",
"k2": "v4",
},
},
}

assert.ElementsMatch(t, expectedTags, actualTags)
Expand All @@ -1173,14 +1205,15 @@ func TestTagsCleanup(t *testing.T) {
"metric1:1|g|#k1:v1,k2:v2",

"metric2:3|ms|#k1:v2,k2:v3",
"metric3;k1=v3;k2=v4:5|c",
}
err := process(testData, ms)
require.NoError(t, err)

time.Sleep(1000 * time.Millisecond)

// they will be reported at least once
assert.Len(t, ms.getEvents(), 2)
assert.Len(t, ms.getEvents(), 3)

testData = []string{
"metric1:+2|g|#k1:v1,k2:v2",
Expand Down Expand Up @@ -1229,12 +1262,13 @@ func TestData(t *testing.T) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}
err := process(testData, ms)
require.NoError(t, err)

events := ms.getEvents()
assert.Len(t, events, 10)
assert.Len(t, events, 11)

mbevent := mbtest.StandardizeEvent(ms, *events[0])
mbtest.WriteEventToDataJSON(t, mbevent, "")
Expand Down Expand Up @@ -1379,6 +1413,7 @@ func BenchmarkIngest(b *testing.B) {
"metric08:seven|s|#k1:v1,k2:v2",
"metric09,k1=v1,k2=v2:8|h",
"metric10.with.dots,k1=v1,k2=v2:9|h",
"metric11;k1=v1;k2=v2:10|c",
}

events := make([]*testUDPEvent, len(tests))
Expand Down
Loading