From 5c1494fa42118d98b9e15e86854771f88c3e6a2d Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 6 May 2022 09:56:59 -0600 Subject: [PATCH] [processor/transform] Add ability to limit attributes (#9552) * Add truncation function * Add TODO * Updated changelog * Updated changelog * Update name to truncateAll * Fixed README * add limit function * Updated changelog * Updated go.sum * Add check to avoid loop * added checked for negative limit * fix README.md * Fix lint issue * Add link to issue for writing logs Co-authored-by: Bogdan Drutu --- CHANGELOG.md | 1 + processor/transformprocessor/README.md | 12 +- .../internal/common/functions.go | 33 +++++ .../internal/common/functions_test.go | 20 +++ .../internal/traces/functions_test.go | 114 ++++++++++++++++++ 5 files changed, 177 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed8a1eb52539..220d991865fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - `datadogexporter`: Add `api.fail_on_invalid_key` to fail fast if api key is invalid (#9426) - `transformprocessor`: Add support for functions to validate parameters (#9563) - `googlecloudexporter`: Add GCP cloud logging exporter (#9679) +- `transformprocessor`: Add new `limit` function to allow limiting the number of items in a map, such as the number of attributes in `attributes` or `resource.attributes` (#9552) - `processor/attributes`: Support attributes set by server authenticator (#9420) - `datadogexporter`: Experimental support for Exponential Histograms with delta aggregation temporality (#8350) diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index 19fb402bd09a..4ee779495705 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -22,7 +22,9 @@ it references an unset map value, there will be no action. - `keep_keys(target, string...)` - `target` is a path expression to a map type field. The map will be mutated to only contain the fields specified by the list of strings. e.g., `keep_keys(attributes, "http.method")`, `keep_keys(attributes, "http.method", "http.route")` -- `truncate_all(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that all string values are truncated to the limit. e.g., `truncate(attributes, 100)` will truncate all string values in `attributes` such that all string values have less than or equal to 100 characters. Non-string values are ignored. +- `truncate_all(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that all string values are truncated to the limit. e.g., `truncate_all(attributes, 100)` will truncate all string values in `attributes` such that all string values have less than or equal to 100 characters. Non-string values are ignored. + +- `limit(target, limit)` - `target` is a path expression to a map type field. `limit` is an integer. The map will be mutated such that the number of items does not exceed the limit. e.g., `limit(attributes, 100)` will limit `attributes` to no more than 100 items. Which items are dropped is random. Supported where operations: - `==` - matches telemetry where the values are equal to each other @@ -50,6 +52,8 @@ processors: - set(status.code, 1) where attributes["http.path"] == "/health" - keep_keys(resource.attributes, "service.name", "service.namespace", "cloud.region") - set(name, attributes["http.route"]) + - limit(attributes, 100) + - limit(resource.attributes, 100) - truncate_all(attributes, 4096) - truncate_all(resource.attributes, 4096) service: @@ -77,5 +81,7 @@ All spans 1) Set status code to OK for all spans with a path `/health` 2) Keep only `service.name`, `service.namespace`, `cloud.region` resource attributes 3) Set `name` to the `http.route` attribute if it is set -4) Truncate all span attributes such that no string value has more than 4096 characters. -5) Truncate all resource attributes such that no string value has more than 4096 characters. +4) Limit all span attributes such that each span has no more than 100 attributes. +5) Limit all resource attributes such that each resource no more than 100 attributes. +6) Truncate all span attributes such that no string value has more than 4096 characters. +7) Truncate all resource attributes such that no string value has more than 4096 characters. diff --git a/processor/transformprocessor/internal/common/functions.go b/processor/transformprocessor/internal/common/functions.go index 081ffe47cfc1..77f87f1674d1 100644 --- a/processor/transformprocessor/internal/common/functions.go +++ b/processor/transformprocessor/internal/common/functions.go @@ -25,6 +25,7 @@ var registry = map[string]interface{}{ "keep_keys": keepKeys, "set": set, "truncate_all": truncateAll, + "limit": limit, } type PathExpressionParser func(*Path) (GetSetter, error) @@ -96,6 +97,38 @@ func truncateAll(target GetSetter, limit int64) (ExprFunc, error) { }) target.Set(ctx, updated) // TODO: Write log when truncation is performed + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9730 + } + return nil + }, nil +} + +func limit(target GetSetter, limit int64) (ExprFunc, error) { + return func(ctx TransformContext) interface{} { + val := target.Get(ctx) + if val == nil { + return nil + } + + if attrs, ok := val.(pcommon.Map); ok { + if int64(attrs.Len()) <= limit { + return nil + } + + updated := pcommon.NewMap() + updated.EnsureCapacity(attrs.Len()) + count := int64(0) + attrs.Range(func(key string, val pcommon.Value) bool { + if count < limit { + updated.Insert(key, val) + count++ + return true + } + return false + }) + target.Set(ctx, updated) + // TODO: Write log when limiting is performed + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9730 } return nil }, nil diff --git a/processor/transformprocessor/internal/common/functions_test.go b/processor/transformprocessor/internal/common/functions_test.go index b9929939c41e..a916e61770e3 100644 --- a/processor/transformprocessor/internal/common/functions_test.go +++ b/processor/transformprocessor/internal/common/functions_test.go @@ -133,6 +133,26 @@ func Test_newFunctionCall_invalid(t *testing.T) { }, }, }, + { + name: "not int", + inv: Invocation{ + Function: "limit", + Arguments: []Value{ + { + Path: &Path{ + Fields: []Field{ + { + Name: "name", + }, + }, + }, + }, + { + String: strp("not an int"), + }, + }, + }, + }, { name: "function call returns error", inv: Invocation{ diff --git a/processor/transformprocessor/internal/traces/functions_test.go b/processor/transformprocessor/internal/traces/functions_test.go index d4e290d240c7..c2be448d23d0 100644 --- a/processor/transformprocessor/internal/traces/functions_test.go +++ b/processor/transformprocessor/internal/traces/functions_test.go @@ -348,6 +348,120 @@ func Test_newFunctionCall(t *testing.T) { attrs.CopyTo(span.Attributes()) }, }, + { + name: "limit attributes", + inv: common.Invocation{ + Function: "limit", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + Int: intp(1), + }, + }, + }, + want: func(span ptrace.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pcommon.NewMap() + attrs.InsertString("test", "hello world") + attrs.CopyTo(span.Attributes()) + }, + }, + { + name: "limit attributes zero", + inv: common.Invocation{ + Function: "limit", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + Int: intp(0), + }, + }, + }, + want: func(span ptrace.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pcommon.NewMap() + attrs.CopyTo(span.Attributes()) + }, + }, + { + name: "limit attributes nothing", + inv: common.Invocation{ + Function: "limit", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + Int: intp(100), + }, + }, + }, + want: func(span ptrace.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pcommon.NewMap() + attrs.InsertString("test", "hello world") + attrs.InsertInt("test2", 3) + attrs.InsertBool("test3", true) + attrs.CopyTo(span.Attributes()) + }, + }, + { + name: "limit resource attributes", + inv: common.Invocation{ + Function: "limit", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + }, + }, + }, + }, + { + Int: intp(1), + }, + }, + }, + want: func(span ptrace.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pcommon.NewMap() + attrs.InsertString("test", "hello world") + attrs.InsertInt("test2", 3) + attrs.InsertBool("test3", true) + attrs.CopyTo(span.Attributes()) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {