Skip to content

Commit

Permalink
Add support for converting tag or field to measurement in converter p…
Browse files Browse the repository at this point in the history
…rocessor (influxdata#7049)
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent 5eac7eb commit d0c7dc7
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 271 deletions.
33 changes: 28 additions & 5 deletions plugins/processors/converter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Values that cannot be converted are dropped.
uniquely identifiable. Fields with the same series key (measurement + tags)
will overwrite one another.

### Configuration:
### Configuration
```toml
# Convert values to another metric value type
[[processors.converter]]
Expand All @@ -19,6 +19,7 @@ will overwrite one another.
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
measurement = []
string = []
integer = []
unsigned = []
Expand All @@ -31,6 +32,7 @@ will overwrite one another.
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
measurement = []
tag = []
string = []
integer = []
Expand All @@ -39,19 +41,40 @@ will overwrite one another.
float = []
```

### Examples:
### Example

Convert `port` tag to a string field:
```toml
[[processors.converter]]
[processors.converter.tags]
string = ["port"]
```

```diff
- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0
+ apache,server=debian-stretch-apache port="80",BusyWorkers=1,BytesPerReq=0
```

Convert all `scboard_*` fields to an integer:
```toml
[[processors.converter]]
[processors.converter.fields]
integer = ["scboard_*"]
tag = ["ParentServerConfigGeneration"]
```

```diff
- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerConfigGeneration=3,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 1502489900000000000
+ apache,server=debian-stretch-apache,ParentServerConfigGeneration=3 port="80",BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i 1502489900000000000
- apache scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49
+ apache scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i
```

Rename the measurement from a tag value:
```toml
[[processors.converter]]
[processors.converter.tags]
measurement = ["topic"]
```

```diff
- mqtt_consumer,topic=sensor temp=42
+ sensor temp=42
```
114 changes: 61 additions & 53 deletions plugins/processors/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package converter

import (
"fmt"
"log"
"math"
"strconv"

Expand All @@ -18,6 +17,7 @@ var sampleConfig = `
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
measurement = []
string = []
integer = []
unsigned = []
Expand All @@ -30,6 +30,7 @@ var sampleConfig = `
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
measurement = []
tag = []
string = []
integer = []
Expand All @@ -39,30 +40,32 @@ var sampleConfig = `
`

type Conversion struct {
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
Measurement []string `toml:"measurement"`
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
}

type Converter struct {
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
Log telegraf.Logger `toml:"-"`

initialized bool
tagConversions *ConversionFilter
fieldConversions *ConversionFilter
}

type ConversionFilter struct {
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
Measurement filter.Filter
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
}

func (p *Converter) SampleConfig() string {
Expand All @@ -73,15 +76,11 @@ func (p *Converter) Description() string {
return "Convert values to another metric value type"
}

func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if !p.initialized {
err := p.compile()
if err != nil {
logPrintf("initialization error: %v\n", err)
return metrics
}
}
func (p *Converter) Init() error {
return p.compile()
}

func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for _, metric := range metrics {
p.convertTags(metric)
p.convertFields(metric)
Expand All @@ -106,7 +105,6 @@ func (p *Converter) compile() error {

p.tagConversions = tf
p.fieldConversions = ff
p.initialized = true
return nil
}

Expand All @@ -117,6 +115,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {

var err error
cf := &ConversionFilter{}
cf.Measurement, err = filter.Compile(conv.Measurement)
if err != nil {
return nil, err
}

cf.Tag, err = filter.Compile(conv.Tag)
if err != nil {
return nil, err
Expand Down Expand Up @@ -150,13 +153,19 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
return cf, nil
}

// convertTags converts tags into fields
// convertTags converts tags into measurements or fields.
func (p *Converter) convertTags(metric telegraf.Metric) {
if p.tagConversions == nil {
return
}

for key, value := range metric.Tags() {
if p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key) {
metric.RemoveTag(key)
metric.SetName(value)
continue
}

if p.tagConversions.String != nil && p.tagConversions.String.Match(key) {
metric.RemoveTag(key)
metric.AddField(key, value)
Expand All @@ -167,7 +176,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toInteger(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
continue
}

Expand All @@ -179,7 +188,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
continue
}

Expand All @@ -192,7 +201,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toBool(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to boolean [%T]: %v\n", value, value)
p.Log.Errorf("error converting to boolean [%T]: %v", value, value)
continue
}

Expand All @@ -205,7 +214,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toFloat(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to float [%T]: %v\n", value, value)
p.Log.Errorf("error converting to float [%T]: %v", value, value)
continue
}

Expand All @@ -216,18 +225,31 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
}
}

// convertFields converts fields into tags or other field types
// convertFields converts fields into measurements, tags, or other field types.
func (p *Converter) convertFields(metric telegraf.Metric) {
if p.fieldConversions == nil {
return
}

for key, value := range metric.Fields() {
if p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
p.Log.Errorf("error converting to measurement [%T]: %v", value, value)
continue
}

metric.RemoveField(key)
metric.SetName(v)
continue
}

if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to tag [%T]: %v\n", value, value)
p.Log.Errorf("error converting to tag [%T]: %v", value, value)
continue
}

Expand All @@ -240,7 +262,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toFloat(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to float [%T]: %v\n", value, value)
p.Log.Errorf("error converting to float [%T]: %v", value, value)
continue
}

Expand All @@ -253,7 +275,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toInteger(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
continue
}

Expand All @@ -266,7 +288,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
continue
}

Expand All @@ -279,7 +301,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toBool(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to bool [%T]: %v\n", value, value)
p.Log.Errorf("error converting to bool [%T]: %v", value, value)
continue
}

Expand All @@ -292,7 +314,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to string [%T]: %v\n", value, value)
p.Log.Errorf("Error converting to string [%T]: %v", value, value)
continue
}

Expand Down Expand Up @@ -336,7 +358,7 @@ func toInteger(v interface{}) (int64, bool) {
} else if value > float64(math.MaxInt64) {
return math.MaxInt64, true
} else {
return int64(Round(value)), true
return int64(math.Round(value)), true
}
case bool:
if value {
Expand Down Expand Up @@ -375,7 +397,7 @@ func toUnsigned(v interface{}) (uint64, bool) {
} else if value > float64(math.MaxUint64) {
return math.MaxUint64, true
} else {
return uint64(Round(value)), true
return uint64(math.Round(value)), true
}
case bool:
if value {
Expand Down Expand Up @@ -435,20 +457,6 @@ func toString(v interface{}) (string, bool) {
return "", false
}

// math.Round was not added until Go 1.10, can be removed when support for Go
// 1.9 is dropped.
func Round(x float64) float64 {
t := math.Trunc(x)
if math.Abs(x-t) >= 0.5 {
return t + math.Copysign(1, x)
}
return t
}

func logPrintf(format string, v ...interface{}) {
log.Printf("D! [processors.converter] "+format, v...)
}

func init() {
processors.Add("converter", func() telegraf.Processor {
return &Converter{}
Expand Down
Loading

0 comments on commit d0c7dc7

Please sign in to comment.