From 0b3fda4c79a63897f321905e476289d91191386b Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Mon, 4 Sep 2017 15:33:20 +0200 Subject: [PATCH 1/4] Add Tags Processor Metrics created by different input plugins can have tags attached to provide additional information. Those tags may either be configured in the global_tags section in the telegraf.conf file or may be provided by the respective plugin. Telegraf provides a filtering mechanism with the *taginclude* property to only include certain tags. However, this mechanism will also drop all tags, that are not explicitly listed. This applies to the plugin specific tags as well as the global tags. In order to always provide global tags, this processor adds all configured tags after the tag filtering was applied. Therefore all those tags will always be present. This is useful in the case, were only some tags provided by an input plugin are needed, but it is not exactly known, which other tags may be attached by the input plugin. Using this processor provides the ability to use *taginclude* for the wanted tags from the input plugin while still retaining global tags. It eases the configuration as the global tags do not need to be repeated in every *taginclude*. This implementation will add the configured tags to all metrics, no filtering is supported. When adding the tags, values of tags with the same key will be overwritten by the processor. Signed-off-by: Karsten Schnitter --- plugins/processors/all/all.go | 1 + plugins/processors/tags/README.md | 19 ++++++++++ plugins/processors/tags/tags.go | 42 +++++++++++++++++++++ plugins/processors/tags/tags_test.go | 56 ++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+) create mode 100644 plugins/processors/tags/README.md create mode 100644 plugins/processors/tags/tags.go create mode 100644 plugins/processors/tags/tags_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 462298f6bbbd1..ae7af3846ebc3 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -2,4 +2,5 @@ package all import ( _ "github.com/influxdata/telegraf/plugins/processors/printer" + _ "github.com/influxdata/telegraf/plugins/processors/tags" ) diff --git a/plugins/processors/tags/README.md b/plugins/processors/tags/README.md new file mode 100644 index 0000000000000..f812e9b4d3e81 --- /dev/null +++ b/plugins/processors/tags/README.md @@ -0,0 +1,19 @@ +# Add Global Tags Processor Plugin + +The tags processor plugin adds all configured tags to every metric passing +through it. Values of already present tags with conflicting keys will be +overwritten. + +While global tags can be configured in the respective section of the +configuration file those tags may be ignored when using the `taginclude` +property. +This plugin provides global tags, that are unaffected by this filtering. + +### Configuration: + +```toml +# Add a global tag to all metrics +[[processors.tags]] +[processors.tags.add] + additional_tag = "tag_value" +``` diff --git a/plugins/processors/tags/tags.go b/plugins/processors/tags/tags.go new file mode 100644 index 0000000000000..4f7957c3a0390 --- /dev/null +++ b/plugins/processors/tags/tags.go @@ -0,0 +1,42 @@ +package tags + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +var sampleConfig = ` +## NOTE This processor will overwrite values of tags, that are already +## present in the metric passed through this filter. + +## Tags to be added (all values must be strings) +# [processors.tags.add] +# additional_tag = "tag_value" +` + +type TagAdder struct { + Add map[string]string +} + +func (p *TagAdder) SampleConfig() string { + return sampleConfig +} + +func (p *TagAdder) Description() string { + return "Add all configured tags to all metrics that pass through this filter." +} + +func (a *TagAdder) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + for key, value := range a.Add { + metric.AddTag(key, value) + } + } + return in +} + +func init() { + processors.Add("tags", func() telegraf.Processor { + return &TagAdder{} + }) +} diff --git a/plugins/processors/tags/tags_test.go b/plugins/processors/tags/tags_test.go new file mode 100644 index 0000000000000..fb5b8abb48cdb --- /dev/null +++ b/plugins/processors/tags/tags_test.go @@ -0,0 +1,56 @@ +package tags + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +func createTestMetric() telegraf.Metric { + metric, _ := metric.New("m1", + map[string]string{"metric_tag": "from_metric"}, + map[string]interface{}{"value": int64(1)}, + time.Now(), + ) + return metric +} + +func calculateProcessedTags(adder TagAdder, metric telegraf.Metric) map[string]string { + processed := adder.Apply(metric) + return processed[0].Tags() +} + +func TestRetainsTags(t *testing.T) { + adder := TagAdder{} + + tags := calculateProcessedTags(adder, createTestMetric()) + + value, present := tags["metric_tag"] + assert.True(t, present, "Tag of metric was not present") + assert.Equal(t, "from_metric", value, "Value of Tag was changed") +} + +func TestAddTags(t *testing.T) { + adder := TagAdder{Add: map[string]string{"added_tag": "from_config", "another_tag": ""}} + + tags := calculateProcessedTags(adder, createTestMetric()) + + value, present := tags["added_tag"] + assert.True(t, present, "Additional Tag of metric was not present") + assert.Equal(t, "from_config", value, "Value of Tag was changed") + assert.Equal(t, 3, len(tags), "Should have one previous and two added tags.") +} + +func TestOverwritesPresentTagValues(t *testing.T) { + adder := TagAdder{Add: map[string]string{"metric_tag": "from_config"}} + + tags := calculateProcessedTags(adder, createTestMetric()) + + value, present := tags["metric_tag"] + assert.True(t, present, "Tag of metric was not present") + assert.Equal(t, 1, len(tags), "Should only have one tag.") + assert.Equal(t, "from_config", value, "Value of Tag was not changed") +} From cd1af2188aab83205a1032e577a4899971d0da9b Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Mon, 19 Feb 2018 14:30:29 +0100 Subject: [PATCH 2/4] Add name modifications Provide general modification of measurement names as available on inputs and aggregators: - name_override - name_prefix - name_suffix --- plugins/processors/tags/tags.go | 14 +++++++++++++- plugins/processors/tags/tags_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/plugins/processors/tags/tags.go b/plugins/processors/tags/tags.go index 4f7957c3a0390..21637b223c463 100644 --- a/plugins/processors/tags/tags.go +++ b/plugins/processors/tags/tags.go @@ -15,7 +15,10 @@ var sampleConfig = ` ` type TagAdder struct { - Add map[string]string + NameOverride string + NamePrefix string + NameSuffix string + Add map[string]string } func (p *TagAdder) SampleConfig() string { @@ -28,6 +31,15 @@ func (p *TagAdder) Description() string { func (a *TagAdder) Apply(in ...telegraf.Metric) []telegraf.Metric { for _, metric := range in { + if len(a.NameOverride) > 0 { + metric.SetName(a.NameOverride) + } + if len(a.NamePrefix) > 0 { + metric.SetPrefix(a.NamePrefix) + } + if len(a.NameSuffix) > 0 { + metric.SetSuffix(a.NameSuffix) + } for key, value := range a.Add { metric.AddTag(key, value) } diff --git a/plugins/processors/tags/tags_test.go b/plugins/processors/tags/tags_test.go index fb5b8abb48cdb..62c3252aace31 100644 --- a/plugins/processors/tags/tags_test.go +++ b/plugins/processors/tags/tags_test.go @@ -54,3 +54,27 @@ func TestOverwritesPresentTagValues(t *testing.T) { assert.Equal(t, 1, len(tags), "Should only have one tag.") assert.Equal(t, "from_config", value, "Value of Tag was not changed") } + +func TestOverridesName(t *testing.T) { + adder := TagAdder{NameOverride: "overridden"} + + processed := adder.Apply(createTestMetric()) + + assert.Equal(t, "overridden", processed[0].Name(), "Name was not overridden") +} + +func TestNamePrefix(t *testing.T) { + adder := TagAdder{NamePrefix: "Pre-"} + + processed := adder.Apply(createTestMetric()) + + assert.Equal(t, "Pre-m1", processed[0].Name(), "Prefix was not applied") +} + +func TestNameSuffix(t *testing.T) { + adder := TagAdder{NameSuffix: "-suff"} + + processed := adder.Apply(createTestMetric()) + + assert.Equal(t, "m1-suff", processed[0].Name(), "Suffix was not applied") +} From 7df07b61323533976757b2d63e791399dc59bb26 Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Mon, 19 Feb 2018 14:41:34 +0100 Subject: [PATCH 3/4] Rename Configuration Since the processor should provide the common interface of inputs and aggregators the tags map is renamed accordingly. --- plugins/processors/tags/tags.go | 6 +++--- plugins/processors/tags/tags_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/processors/tags/tags.go b/plugins/processors/tags/tags.go index 21637b223c463..9065cab8fc7f6 100644 --- a/plugins/processors/tags/tags.go +++ b/plugins/processors/tags/tags.go @@ -10,7 +10,7 @@ var sampleConfig = ` ## present in the metric passed through this filter. ## Tags to be added (all values must be strings) -# [processors.tags.add] +# [processors.tags.tags] # additional_tag = "tag_value" ` @@ -18,7 +18,7 @@ type TagAdder struct { NameOverride string NamePrefix string NameSuffix string - Add map[string]string + Tags map[string]string } func (p *TagAdder) SampleConfig() string { @@ -40,7 +40,7 @@ func (a *TagAdder) Apply(in ...telegraf.Metric) []telegraf.Metric { if len(a.NameSuffix) > 0 { metric.SetSuffix(a.NameSuffix) } - for key, value := range a.Add { + for key, value := range a.Tags { metric.AddTag(key, value) } } diff --git a/plugins/processors/tags/tags_test.go b/plugins/processors/tags/tags_test.go index 62c3252aace31..bf7b955ce4c80 100644 --- a/plugins/processors/tags/tags_test.go +++ b/plugins/processors/tags/tags_test.go @@ -34,7 +34,7 @@ func TestRetainsTags(t *testing.T) { } func TestAddTags(t *testing.T) { - adder := TagAdder{Add: map[string]string{"added_tag": "from_config", "another_tag": ""}} + adder := TagAdder{Tags: map[string]string{"added_tag": "from_config", "another_tag": ""}} tags := calculateProcessedTags(adder, createTestMetric()) @@ -45,7 +45,7 @@ func TestAddTags(t *testing.T) { } func TestOverwritesPresentTagValues(t *testing.T) { - adder := TagAdder{Add: map[string]string{"metric_tag": "from_config"}} + adder := TagAdder{Tags: map[string]string{"metric_tag": "from_config"}} tags := calculateProcessedTags(adder, createTestMetric()) From 225e84e38f66a1ab3afe95638e8b250b3c8d6d2c Mon Sep 17 00:00:00 2001 From: Karsten Schnitter Date: Thu, 1 Mar 2018 16:09:02 +0100 Subject: [PATCH 4/4] Rename Plugin --- plugins/processors/all/all.go | 2 +- plugins/processors/override/README.md | 30 ++++++++++ plugins/processors/override/override.go | 60 +++++++++++++++++++ .../override_test.go} | 30 +++++----- plugins/processors/tags/README.md | 19 ------ plugins/processors/tags/tags.go | 54 ----------------- 6 files changed, 106 insertions(+), 89 deletions(-) create mode 100644 plugins/processors/override/README.md create mode 100644 plugins/processors/override/override.go rename plugins/processors/{tags/tags_test.go => override/override_test.go} (64%) delete mode 100644 plugins/processors/tags/README.md delete mode 100644 plugins/processors/tags/tags.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index ae7af3846ebc3..1eecbfa7e96d9 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -1,6 +1,6 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/printer" - _ "github.com/influxdata/telegraf/plugins/processors/tags" ) diff --git a/plugins/processors/override/README.md b/plugins/processors/override/README.md new file mode 100644 index 0000000000000..3fb1b7c0f1609 --- /dev/null +++ b/plugins/processors/override/README.md @@ -0,0 +1,30 @@ +# Override Processor Plugin + +The override processor plugin allows overriding all modifications that are +supported by input plugins and aggregators: + +* name_override +* name_prefix +* name_suffix +* tags + +All metrics passing through this processor will be modified accordingly. Values +of *name_override*, *name_prefix*, *name_suffix* and already present *tags* with +conflicting keys will be overwritten. Absent *tags* will be created. + +Use-case of this plugin encompass ensuring certain tags or naming conventions +are adhered to irrespective of input plugin configurations, e.g. by +`taginclude`. + +### Configuration: + +```toml +# Add a global tag to all metrics +[[processors.override]] + name_override = "new name_override" + name_prefix = "new name_prefix" + name_suffix = ":new name_suffix" + [processors.tags.add] + additional_tag = "tag_value" + existing_tag = "new tag_value" +``` diff --git a/plugins/processors/override/override.go b/plugins/processors/override/override.go new file mode 100644 index 0000000000000..347b640332224 --- /dev/null +++ b/plugins/processors/override/override.go @@ -0,0 +1,60 @@ +package override + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +var sampleConfig = ` +## NOTE This processor will override names, name prefixes, name suffixes and +## values of tags, that are already present in the metric passed through this +## filter. + +## All modifications on inputs and aggregators can be overridden: +# name_override = "new name" +# name_prefix = "new name_prefix" +# name_suffix = "new name_suffix" + +## Tags to be added (all values must be strings) +# [processors.overide.tags] +# additional_tag = "tag_value" +` + +type Override struct { + NameOverride string + NamePrefix string + NameSuffix string + Tags map[string]string +} + +func (p *Override) SampleConfig() string { + return sampleConfig +} + +func (p *Override) Description() string { + return "Add all configured tags to all metrics that pass through this filter." +} + +func (p *Override) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + if len(p.NameOverride) > 0 { + metric.SetName(p.NameOverride) + } + if len(p.NamePrefix) > 0 { + metric.SetPrefix(p.NamePrefix) + } + if len(p.NameSuffix) > 0 { + metric.SetSuffix(p.NameSuffix) + } + for key, value := range p.Tags { + metric.AddTag(key, value) + } + } + return in +} + +func init() { + processors.Add("override", func() telegraf.Processor { + return &Override{} + }) +} diff --git a/plugins/processors/tags/tags_test.go b/plugins/processors/override/override_test.go similarity index 64% rename from plugins/processors/tags/tags_test.go rename to plugins/processors/override/override_test.go index bf7b955ce4c80..433751af96255 100644 --- a/plugins/processors/tags/tags_test.go +++ b/plugins/processors/override/override_test.go @@ -1,4 +1,4 @@ -package tags +package override import ( "testing" @@ -18,15 +18,15 @@ func createTestMetric() telegraf.Metric { return metric } -func calculateProcessedTags(adder TagAdder, metric telegraf.Metric) map[string]string { - processed := adder.Apply(metric) +func calculateProcessedTags(processor Override, metric telegraf.Metric) map[string]string { + processed := processor.Apply(metric) return processed[0].Tags() } func TestRetainsTags(t *testing.T) { - adder := TagAdder{} + processor := Override{} - tags := calculateProcessedTags(adder, createTestMetric()) + tags := calculateProcessedTags(processor, createTestMetric()) value, present := tags["metric_tag"] assert.True(t, present, "Tag of metric was not present") @@ -34,9 +34,9 @@ func TestRetainsTags(t *testing.T) { } func TestAddTags(t *testing.T) { - adder := TagAdder{Tags: map[string]string{"added_tag": "from_config", "another_tag": ""}} + processor := Override{Tags: map[string]string{"added_tag": "from_config", "another_tag": ""}} - tags := calculateProcessedTags(adder, createTestMetric()) + tags := calculateProcessedTags(processor, createTestMetric()) value, present := tags["added_tag"] assert.True(t, present, "Additional Tag of metric was not present") @@ -45,9 +45,9 @@ func TestAddTags(t *testing.T) { } func TestOverwritesPresentTagValues(t *testing.T) { - adder := TagAdder{Tags: map[string]string{"metric_tag": "from_config"}} + processor := Override{Tags: map[string]string{"metric_tag": "from_config"}} - tags := calculateProcessedTags(adder, createTestMetric()) + tags := calculateProcessedTags(processor, createTestMetric()) value, present := tags["metric_tag"] assert.True(t, present, "Tag of metric was not present") @@ -56,25 +56,25 @@ func TestOverwritesPresentTagValues(t *testing.T) { } func TestOverridesName(t *testing.T) { - adder := TagAdder{NameOverride: "overridden"} + processor := Override{NameOverride: "overridden"} - processed := adder.Apply(createTestMetric()) + processed := processor.Apply(createTestMetric()) assert.Equal(t, "overridden", processed[0].Name(), "Name was not overridden") } func TestNamePrefix(t *testing.T) { - adder := TagAdder{NamePrefix: "Pre-"} + processor := Override{NamePrefix: "Pre-"} - processed := adder.Apply(createTestMetric()) + processed := processor.Apply(createTestMetric()) assert.Equal(t, "Pre-m1", processed[0].Name(), "Prefix was not applied") } func TestNameSuffix(t *testing.T) { - adder := TagAdder{NameSuffix: "-suff"} + processor := Override{NameSuffix: "-suff"} - processed := adder.Apply(createTestMetric()) + processed := processor.Apply(createTestMetric()) assert.Equal(t, "m1-suff", processed[0].Name(), "Suffix was not applied") } diff --git a/plugins/processors/tags/README.md b/plugins/processors/tags/README.md deleted file mode 100644 index f812e9b4d3e81..0000000000000 --- a/plugins/processors/tags/README.md +++ /dev/null @@ -1,19 +0,0 @@ -# Add Global Tags Processor Plugin - -The tags processor plugin adds all configured tags to every metric passing -through it. Values of already present tags with conflicting keys will be -overwritten. - -While global tags can be configured in the respective section of the -configuration file those tags may be ignored when using the `taginclude` -property. -This plugin provides global tags, that are unaffected by this filtering. - -### Configuration: - -```toml -# Add a global tag to all metrics -[[processors.tags]] -[processors.tags.add] - additional_tag = "tag_value" -``` diff --git a/plugins/processors/tags/tags.go b/plugins/processors/tags/tags.go deleted file mode 100644 index 9065cab8fc7f6..0000000000000 --- a/plugins/processors/tags/tags.go +++ /dev/null @@ -1,54 +0,0 @@ -package tags - -import ( - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/processors" -) - -var sampleConfig = ` -## NOTE This processor will overwrite values of tags, that are already -## present in the metric passed through this filter. - -## Tags to be added (all values must be strings) -# [processors.tags.tags] -# additional_tag = "tag_value" -` - -type TagAdder struct { - NameOverride string - NamePrefix string - NameSuffix string - Tags map[string]string -} - -func (p *TagAdder) SampleConfig() string { - return sampleConfig -} - -func (p *TagAdder) Description() string { - return "Add all configured tags to all metrics that pass through this filter." -} - -func (a *TagAdder) Apply(in ...telegraf.Metric) []telegraf.Metric { - for _, metric := range in { - if len(a.NameOverride) > 0 { - metric.SetName(a.NameOverride) - } - if len(a.NamePrefix) > 0 { - metric.SetPrefix(a.NamePrefix) - } - if len(a.NameSuffix) > 0 { - metric.SetSuffix(a.NameSuffix) - } - for key, value := range a.Tags { - metric.AddTag(key, value) - } - } - return in -} - -func init() { - processors.Add("tags", func() telegraf.Processor { - return &TagAdder{} - }) -}