Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New label processor #20

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions processor/labelsprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Labels Processor

Supported pipeline types: metrics

The labels processor can be used to add data point labels to all metrics that pass through it.
If any specified labels already exist in the metric, the value will be updated.

Please refer to [config.go](./config.go) for the config spec.

Example:

```yaml
processors:
labels_processor:
labels:
- key: label1
value: value1
- key: label2
value: value2
amanbrar1999 marked this conversation as resolved.
Show resolved Hide resolved
```

Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.
15 changes: 15 additions & 0 deletions processor/labelsprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package labelsprocessor

import "go.opentelemetry.io/collector/config/configmodels"

// Config defines configuration for Labels processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Labels []LabelConfig `mapstructure:"labels"`
}

// LabelConfig defines configuration for provided labels
type LabelConfig struct {
Key string `mapstructure:"key"`
Value string `mapstructure:"value"`
}
50 changes: 50 additions & 0 deletions processor/labelsprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package labelsprocessor

import (
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadConfig(t *testing.T) {

factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factories.Processors[typeStr] = NewFactory()

os.Setenv("VALUE_1", "first_val")
os.Setenv("VALUE_2", "second_val")

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
assert.NotNil(t, cfg)

assert.Equal(t, cfg.Processors["labels_processor"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor",
},
Labels: []LabelConfig{
{Key: "label1", Value: "value1"},
{Key: "label2", Value: "value2"},
},
})

assert.Equal(t, cfg.Processors["labels_processor/env_vars"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor/env_vars",
},
Labels: []LabelConfig{
{Key: "label1", Value: "first_val"},
{Key: "label2", Value: "second_val"},
},
})

}
49 changes: 49 additions & 0 deletions processor/labelsprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package labelsprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
typeStr = "labels_processor"
)

var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}

// NewFactory returns a new factory for the Labels processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithMetrics(createMetricsProcessor))
}

func createDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
}
}

func createMetricsProcessor(
_ context.Context,
_ component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) {
lp, err := newLabelMetricProcessor(cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
lp,
processorhelper.WithCapabilities(processorCapabilities))
}
51 changes: 51 additions & 0 deletions processor/labelsprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package labelsprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func TestType(t *testing.T) {
factory := NewFactory()
pType := factory.Type()
assert.Equal(t, pType, configmodels.Type("labels_processor"))
}

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: typeStr,
TypeVal: typeStr,
},
})
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateProcessor(t *testing.T) {

factory := NewFactory()
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor",
},
Labels: []LabelConfig{
{Key: "label1", Value: "value1"},
},
}

mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopMetricsExporter())
assert.NoError(t, mErr)
assert.NotNil(t, mp)

}
124 changes: 124 additions & 0 deletions processor/labelsprocessor/labels_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package labelsprocessor

import (
"context"
"fmt"

"go.opentelemetry.io/collector/consumer/pdata"
v11 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1"
v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
)

type labelMetricProcessor struct {
cfg *Config
}

func newLabelMetricProcessor(cfg *Config) (*labelMetricProcessor, error) {
err := validateConfig(cfg)
if err != nil {
return nil, err
}
return &labelMetricProcessor{cfg: cfg}, nil
}

func validateConfig(cfg *Config) error {
// Ensure no empty keys/values exist
for _, elem := range cfg.Labels {
if elem.Key == "" || elem.Value == "" {
amanbrar1999 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("Labels Processor configuration contains an empty key or value")
}
}

//Ensure no duplicate keys exist
keys := make(map[string]bool)
for _, elem := range cfg.Labels {
_, value := keys[elem.Key]
if value {
return fmt.Errorf("Labels Processor configuration contains duplicate keys")
}
keys[elem.Key] = true
}

return nil
}

func (lp *labelMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {

otlpMetrics := pdata.MetricsToOtlp(md)

for _, otlpMetric := range otlpMetrics {
for _, instrMetric := range otlpMetric.GetInstrumentationLibraryMetrics() {
for _, metric := range instrMetric.GetMetrics() {

// Multiple types of Data Points exists, and each of them must be handled differently
if metric.GetIntSum() != nil {
intDataPoints := metric.GetIntSum().GetDataPoints()
handleIntDataPoints(intDataPoints, lp)
} else if metric.GetIntGauge() != nil {
intDataPoints := metric.GetIntGauge().GetDataPoints()
handleIntDataPoints(intDataPoints, lp)
} else if metric.GetDoubleGauge() != nil {
doubleDataPoints := metric.GetDoubleGauge().GetDataPoints()
handleDoubleDataPoints(doubleDataPoints, lp)
} else if metric.GetDoubleSum() != nil {
doubleDataPoints := metric.GetDoubleSum().GetDataPoints()
handleDoubleDataPoints(doubleDataPoints, lp)
} else if metric.GetIntHistogram() != nil {
intHistogramDataPoints := metric.GetIntHistogram().GetDataPoints()
handleIntHistogramDataPoints(intHistogramDataPoints, lp)
} else if metric.GetDoubleHistogram() != nil {
doubleHistogramDataPoints := metric.GetDoubleHistogram().GetDataPoints()
handleDoubleHistogramDataPoints(doubleHistogramDataPoints, lp)
}

}
}
}

return md, nil
}

func handleIntDataPoints(intDataPoints []*v1.IntDataPoint, lp *labelMetricProcessor) {
for _, label := range lp.cfg.Labels {
for _, dataPoint := range intDataPoints {
deDuplicateAndAppend(&dataPoint.Labels, label.Key, label.Value)
}
}
}

func handleDoubleDataPoints(doubleDataPoints []*v1.DoubleDataPoint, lp *labelMetricProcessor) {
for _, label := range lp.cfg.Labels {
for _, dataPoint := range doubleDataPoints {
deDuplicateAndAppend(&dataPoint.Labels, label.Key, label.Value)
}
}
}

func handleIntHistogramDataPoints(intHistogramDataPoints []*v1.IntHistogramDataPoint, lp *labelMetricProcessor) {
for _, label := range lp.cfg.Labels {
for _, dataPoint := range intHistogramDataPoints {
deDuplicateAndAppend(&dataPoint.Labels, label.Key, label.Value)
}
}
}

func handleDoubleHistogramDataPoints(doubleHistogramDataPoints []*v1.DoubleHistogramDataPoint, lp *labelMetricProcessor) {
for _, label := range lp.cfg.Labels {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can save some repetition if you have this outer loop build into deDuplicateAndAppend function. So like:

for _, dataPoint := range doubleHistogramDataPoints {
	deDuplicateAndAppend(&dataPoint.Labels, lp.cfg.Labels)
}

So basically have the deDuplicateAndAppend to deal with looping through configured labels. If you do this, the above 3 functions would have less duplicate code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, will do

for _, dataPoint := range doubleHistogramDataPoints {
deDuplicateAndAppend(&dataPoint.Labels, label.Key, label.Value)
}
}
}
alvinlin123 marked this conversation as resolved.
Show resolved Hide resolved

// This processor will always by default update existing label values. Also assumes duplicate labels do not already exist in the metric
Copy link

@alvinlin123 alvinlin123 Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while this comment is good, it explains what the function is doing not "how", but consider if you can use better name for the function such that you don't need to have a comment for it.

DeDuplicateAndAppend is a bit misleading here because this function is not really doing "deduplicate"; it is either overwriting or append. The code is actually overwriting the label value regardless if the label value is "duplicated" or not. So maybe a term like "Upsert" or "UpdateOrInsert" would be a better term to use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that I think about it, I agree the function name is not very representative of what it is actually doing, I will change it to "upsert" as that is a term used in other parts of the collector as well

func deDuplicateAndAppend(labels *[]*v11.StringKeyValue, key string, value string) {
// If the key already exists, overwrite it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Usually comment like this is not useful as we can see what you are doing from the code quite easily. Also comment like this may start to be a lie as time goes; when other started to modify code but not the comment.

If you really think that comment like this is needed, then consider restructuring your code like the following to use method name as comment (but I don't think it's needed in your case, so following is just example).

if label, ok := findLabelByName(labels, key); ok {
   overrideExistingLabel(labels, value);
} else {
  appendNewLabel(labels, key, valye)
}

This applies to the // If it does not exist, append it comment too.

Copy link
Author

@amanbrar1999 amanbrar1999 Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, comments will be removed

for _, elem := range *labels {
if elem.Key == key {
elem.Value = value
return
}
}
// If it does not exist, append it
*labels = append(*labels, &v11.StringKeyValue{Key: key, Value: value})
}
Loading