Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Wire up metrics processing #10100

Merged
merged 22 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

### 💡 Enhancements 💡

- `transformprocessor`: Add transformation of metrics (#10100)
- `kubeletstatsreceiver`: Update receiver to use new Metrics Builder. All emitted metrics remain the same. (#9744)

### 🧰 Bug fixes 🧰
Expand Down
51 changes: 33 additions & 18 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# Transform Processor

| Status | |
| ------------------------ | ---------------- |
| Stability | [In development] |
| Supported pipeline types | traces, logs |
| Distributions | none |
| Status | |
| ------------------------ | --------------------- |
| Stability | [In development] |
| Supported pipeline types | traces, metrics, logs |
| Distributions | none |

The transform processor modifies telemetry based on configuration using the Telemetry Query Language.
The transform processor modifies telemetry based on configuration using the [Telemetry Query Language](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language).
It takes a list of queries which are performed in the order specified in the config.

Queries are composed of the following parts
- Path expressions: Fields within the incoming data can be referenced using expressions composed of the names as defined
in the OTLP protobuf definition. e.g., `status.code`, `attributes["http.method"]`. If the path expression begins with
`resource.` or `instrumentation_library.`, it will reference those values.
`resource.` or `instrumentation_library.`, it will reference those values. For metrics, `name`, `description`, `unit`, `type`, `is_monotonic`, and `aggregation_temporality` are accessed via `metric.`
- The name `instrumentation_library` within OpenTelemetry is currently under discussion and may be changed in the future.
- Metric data types are `None`, `Gauge`, `Sum`, `Histogram`, `ExponentialHistogram`, and `Summary`
- Literals: Strings, ints, and floats can be referenced as literal values
- Function invocations: Functions can be invoked with arguments matching the function's expected arguments
- Where clause: Telemetry to modify can be filtered by appending `where a <op> b`, with `a` and `b` being any of the above.
Expand Down Expand Up @@ -46,11 +47,6 @@ exporters:

processors:
transform:
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
traces:
queries:
- set(status.code, 1) where attributes["http.path"] == "/health"
Expand All @@ -60,6 +56,18 @@ processors:
- limit(resource.attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
metrics:
queries:
- set(metric.description, "Sum") where metric.type == "Sum"
- keep_keys(resource.attributes, "host.name")
- limit(attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
- keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region")
- set(body, attributes["http.route"])
service:
pipelines:
logs:
Expand All @@ -74,11 +82,6 @@ service:

This processor will perform the operations in order for

All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set

All spans

Expand All @@ -90,4 +93,16 @@ All spans
6) Truncate all span attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.

[In development]: https://github.com/open-telemetry/opentelemetry-collector#in-development
All metrics and their data points

1) Set metric description to "Sum" if the metric type is "Sum"
2) Keep only the `host.name` resource attributes
4) Limit all data point attributes such that each data point has no more than 100 attributes.
6) Truncate all data point attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.

All logs

1) Set severity text to FAIL if the body contains a string text "request failed"
2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes
3) Set `body` to the `http.route` attribute if it is set
26 changes: 13 additions & 13 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,41 @@ import (
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

type LogsConfig struct {
type SignalConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for logs processing.
functions map[string]interface{} `mapstructure:"-"`
}

type TracesConfig struct {
Queries []string `mapstructure:"queries"`

// The functions that have been registered in the extension for traces processing.
// The functions that have been registered in the extension for processing.
functions map[string]interface{} `mapstructure:"-"`
}

type Config struct {
config.ProcessorSettings `mapstructure:",squash"`

Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`
Logs SignalConfig `mapstructure:"logs"`
Traces SignalConfig `mapstructure:"traces"`
Metrics SignalConfig `mapstructure:"metrics"`
}

var _ config.Processor = (*Config)(nil)

func (c *Config) Validate() error {
var errors error
_, err := common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
_, err := common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Metrics.Queries, c.Metrics.functions, metrics.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
_, err = common.ParseQueries(c.Traces.Queries, c.Traces.functions, traces.ParsePath)
_, err = common.ParseQueries(c.Logs.Queries, c.Logs.functions, logs.ParsePath)
if err != nil {
errors = multierr.Append(errors, err)
}
Expand Down
37 changes: 27 additions & 10 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
)

func TestLoadingConfig(t *testing.T) {
Expand All @@ -41,22 +42,30 @@ func TestLoadingConfig(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Traces: SignalConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
functions: traces.DefaultFunctions(),
},
Traces: TracesConfig{
Metrics: SignalConfig{
Queries: []string{
`set(name, "bear") where attributes["http.path"] == "/animal"`,
`set(metric.name, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{
`set(body, "bear") where attributes["http.path"] == "/animal"`,
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: logs.DefaultFunctions(),
},
})
}

Expand All @@ -67,19 +76,27 @@ func TestLoadInvalidConfig(t *testing.T) {
factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_trace.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_metric.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_trace.yaml"), factories)
cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_metric.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)

cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function_log.yaml"), factories)
assert.Error(t, err)
assert.NotNil(t, cfg)
}
31 changes: 29 additions & 2 deletions processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)
Expand All @@ -39,22 +41,28 @@ func NewFactory() component.ProcessorFactory {
createDefaultConfig,
component.WithLogsProcessor(createLogsProcessor),
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
)
}

func createDefaultConfig() config.Processor {
return &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Logs: LogsConfig{
Logs: SignalConfig{
Queries: []string{},

functions: logs.DefaultFunctions(),
},
Traces: TracesConfig{
Traces: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
},
Metrics: SignalConfig{
Queries: []string{},

functions: metrics.DefaultFunctions(),
},
}
}

Expand Down Expand Up @@ -95,3 +103,22 @@ func createTracesProcessor(
proc.ProcessTraces,
processorhelper.WithCapabilities(processorCapabilities))
}

func createMetricsProcessor(
_ context.Context,
settings component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

proc, err := metrics.NewProcessor(oCfg.Metrics.Queries, oCfg.Metrics.functions, settings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
proc.ProcessMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}
Loading