From 200d9504a6b152e49f510b0dadc55e37480bee32 Mon Sep 17 00:00:00 2001 From: Ben Keith Date: Tue, 22 Sep 2020 18:13:16 +0000 Subject: [PATCH 1/4] Log Support for attribute Processor This complements the trace span support already present. It shares the same config struct to maintain backwards-compatibility with spans in the cleanest way possible. --- .../attributesprocessor/attributes_log.go | 97 ++++ .../attributes_log_test.go | 454 ++++++++++++++++++ .../{attributes.go => attributes_trace.go} | 8 +- ...butes_test.go => attributes_trace_test.go} | 0 processor/attributesprocessor/factory.go | 36 +- processor/attributesprocessor/factory_test.go | 48 ++ processor/spanprocessor/span.go | 1 + 7 files changed, 638 insertions(+), 6 deletions(-) create mode 100644 processor/attributesprocessor/attributes_log.go create mode 100644 processor/attributesprocessor/attributes_log_test.go rename processor/attributesprocessor/{attributes.go => attributes_trace.go} (86%) rename processor/attributesprocessor/{attributes_test.go => attributes_trace_test.go} (100%) diff --git a/processor/attributesprocessor/attributes_log.go b/processor/attributesprocessor/attributes_log.go new file mode 100644 index 00000000000..5a2e2e41581 --- /dev/null +++ b/processor/attributesprocessor/attributes_log.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package attributesprocessor + +import ( + "context" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/processor/filterlog" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +type logAttributesProcessor struct { + attrProc *processorhelper.AttrProc + include filterlog.Matcher + exclude filterlog.Matcher +} + +// newLogAttributesProcessor returns a processor that modifies attributes of a +// log record. To construct the attributes processors, the use of the factory +// methods are required in order to validate the inputs. +func newLogAttributesProcessor(attrProc *processorhelper.AttrProc, include, exclude filterlog.Matcher) *logAttributesProcessor { + return &logAttributesProcessor{ + attrProc: attrProc, + include: include, + exclude: exclude, + } +} + +// ProcessLogs implements the LogsProcessor +func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) { + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + rs := rls.At(i) + if rs.IsNil() { + continue + } + ilss := rls.At(i).InstrumentationLibraryLogs() + for j := 0; j < ilss.Len(); j++ { + ils := ilss.At(j) + if ils.IsNil() { + continue + } + logs := ils.Logs() + for k := 0; k < logs.Len(); k++ { + lr := logs.At(k) + if lr.IsNil() { + // Do not create empty log records just to add attributes + continue + } + + if a.skipLog(lr) { + continue + } + + a.attrProc.Process(lr.Attributes()) + } + } + } + return ld, nil +} + +// skipLog determines if a log should be processed. +// True is returned when a log should be skipped. +// False is returned when a log should not be skipped. +// The logic determining if a log should be processed is set +// in the attribute configuration with the include and exclude settings. +// Include properties are checked before exclude settings are checked. +func (a *logAttributesProcessor) skipLog(lr pdata.LogRecord) bool { + if a.include != nil { + // A false returned in this case means the log should not be processed. + if include := a.include.MatchLogRecord(lr); !include { + return true + } + } + + if a.exclude != nil { + // A true returned in this case means the log should not be processed. + if exclude := a.exclude.MatchLogRecord(lr); exclude { + return true + } + } + + return false +} diff --git a/processor/attributesprocessor/attributes_log_test.go b/processor/attributesprocessor/attributes_log_test.go new file mode 100644 index 00000000000..f045d405520 --- /dev/null +++ b/processor/attributesprocessor/attributes_log_test.go @@ -0,0 +1,454 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package attributesprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/internal/processor/filterconfig" + "go.opentelemetry.io/collector/internal/processor/filterset" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +// Common structure for all the Tests +type logTestCase struct { + name string + inputAttributes map[string]pdata.AttributeValue + expectedAttributes map[string]pdata.AttributeValue +} + +// runIndividualLogTestCase is the common logic of passing trace data through a configured attributes processor. +func runIndividualLogTestCase(t *testing.T, tt logTestCase, tp component.LogsProcessor) { + t.Run(tt.name, func(t *testing.T) { + ld := generateLogData(tt.name, tt.inputAttributes) + assert.NoError(t, tp.ConsumeLogs(context.Background(), ld)) + // Ensure that the modified `ld` has the attributes sorted: + sortLogAttributes(ld) + require.Equal(t, generateLogData(tt.name, tt.expectedAttributes), ld) + }) +} + +func generateLogData(logName string, attrs map[string]pdata.AttributeValue) pdata.Logs { + td := pdata.NewLogs() + td.ResourceLogs().Resize(1) + rs := td.ResourceLogs().At(0) + rs.Resource().InitEmpty() + rs.InstrumentationLibraryLogs().Resize(1) + ils := rs.InstrumentationLibraryLogs().At(0) + lrs := ils.Logs() + lrs.Resize(1) + lrs.At(0).SetName(logName) + lrs.At(0).Attributes().InitFromMap(attrs).Sort() + return td +} + +func sortLogAttributes(ld pdata.Logs) { + rss := ld.ResourceLogs() + for i := 0; i < rss.Len(); i++ { + rs := rss.At(i) + if rs.IsNil() { + continue + } + if !rs.Resource().IsNil() { + rs.Resource().Attributes().Sort() + } + ilss := rss.At(i).InstrumentationLibraryLogs() + for j := 0; j < ilss.Len(); j++ { + ils := ilss.At(j) + if ils.IsNil() { + continue + } + logs := ils.Logs() + for k := 0; k < logs.Len(); k++ { + s := logs.At(k) + if !s.IsNil() { + s.Attributes().Sort() + } + } + } + } +} + +// TestLogProcessor_Values tests all possible value types. +func TestLogProcessor_NilEmptyData(t *testing.T) { + type nilEmptyTestCase struct { + name string + input pdata.Logs + output pdata.Logs + } + testCases := []nilEmptyTestCase{ + { + name: "empty", + input: testdata.GenerateLogDataEmpty(), + output: testdata.GenerateLogDataEmpty(), + }, + { + name: "one-empty-resource-logs", + input: testdata.GenerateLogDataOneEmptyResourceLogs(), + output: testdata.GenerateLogDataOneEmptyResourceLogs(), + }, + { + name: "one-empty-one-nil-resource-logs", + input: testdata.GenerateLogDataOneEmptyOneNilResourceLogs(), + output: testdata.GenerateLogDataOneEmptyOneNilResourceLogs(), + }, + { + name: "one-empty-one-nil-instrumentation-library", + input: testdata.GenerateLogDataOneEmptyOneNilInstrumentationLibrary(), + output: testdata.GenerateLogDataOneEmptyOneNilInstrumentationLibrary(), + }, + { + name: "one-empty-one-nil-log-record", + input: testdata.GenerateLogDataOneEmptyOneNilLogRecord(), + output: func() pdata.Logs { + lr := testdata.GenerateLogDataOneEmptyOneNilLogRecord() + lr.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Attributes().InitEmptyWithCapacity(0) + return lr + }(), + }, + { + name: "no-libraries", + input: testdata.GenerateLogDataOneEmptyResourceLogs(), + output: testdata.GenerateLogDataOneEmptyResourceLogs(), + }, + } + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Settings.Actions = []processorhelper.ActionKeyValue{ + {Key: "attribute1", Action: processorhelper.INSERT, Value: 123}, + {Key: "attribute1", Action: processorhelper.DELETE}, + } + + tp, err := factory.CreateLogsProcessor( + context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, exportertest.NewNopLogsExporter()) + require.Nil(t, err) + require.NotNil(t, tp) + for i := range testCases { + tt := testCases[i] + t.Run(tt.name, func(t *testing.T) { + assert.NoError(t, tp.ConsumeLogs(context.Background(), tt.input)) + assert.EqualValues(t, tt.output, tt.input) + }) + } +} + +func TestAttributes_FilterLogs(t *testing.T) { + testCases := []logTestCase{ + { + name: "apply processor", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + }, + }, + { + name: "apply processor with different value for exclude property", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(false), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + "NoModification": pdata.NewAttributeValueBool(false), + }, + }, + { + name: "incorrect name for include property", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + { + name: "attribute match for exclude property", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + }, + } + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "attribute1", Action: processorhelper.INSERT, Value: 123}, + } + oCfg.Include = &filterconfig.MatchProperties{ + LogNames: []string{"^[^i].*"}, + Config: *createConfig(filterset.Regexp), + } + oCfg.Exclude = &filterconfig.MatchProperties{ + Attributes: []filterconfig.Attribute{ + {Key: "NoModification", Value: true}, + }, + Config: *createConfig(filterset.Strict), + } + tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + require.Nil(t, err) + require.NotNil(t, tp) + + for _, tt := range testCases { + runIndividualLogTestCase(t, tt, tp) + } +} + +func TestAttributes_FilterLogsByNameStrict(t *testing.T) { + testCases := []logTestCase{ + { + name: "apply", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + }, + }, + { + name: "apply", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(false), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + "NoModification": pdata.NewAttributeValueBool(false), + }, + }, + { + name: "incorrect_log_name", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + { + name: "dont_apply", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + { + name: "incorrect_log_name_with_attr", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + }, + } + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "attribute1", Action: processorhelper.INSERT, Value: 123}, + } + oCfg.Include = &filterconfig.MatchProperties{ + LogNames: []string{"apply", "dont_apply"}, + Config: *createConfig(filterset.Strict), + } + oCfg.Exclude = &filterconfig.MatchProperties{ + LogNames: []string{"dont_apply"}, + Config: *createConfig(filterset.Strict), + } + tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + require.Nil(t, err) + require.NotNil(t, tp) + + for _, tt := range testCases { + runIndividualLogTestCase(t, tt, tp) + } +} + +func TestAttributes_FilterLogsByNameRegexp(t *testing.T) { + testCases := []logTestCase{ + { + name: "apply_to_log_with_no_attrs", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + }, + }, + { + name: "apply_to_log_with_attr", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(false), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + "NoModification": pdata.NewAttributeValueBool(false), + }, + }, + { + name: "incorrect_log_name", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + { + name: "apply_dont_apply", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + { + name: "incorrect_log_name_with_attr", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(true), + }, + }, + } + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "attribute1", Action: processorhelper.INSERT, Value: 123}, + } + oCfg.Include = &filterconfig.MatchProperties{ + LogNames: []string{"^apply.*"}, + Config: *createConfig(filterset.Regexp), + } + oCfg.Exclude = &filterconfig.MatchProperties{ + LogNames: []string{".*dont_apply$"}, + Config: *createConfig(filterset.Regexp), + } + tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + require.Nil(t, err) + require.NotNil(t, tp) + + for _, tt := range testCases { + runIndividualLogTestCase(t, tt, tp) + } +} + +func TestLogAttributes_Hash(t *testing.T) { + testCases := []logTestCase{ + { + name: "String", + inputAttributes: map[string]pdata.AttributeValue{ + "user.email": pdata.NewAttributeValueString("john.doe@example.com"), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "user.email": pdata.NewAttributeValueString("73ec53c4ba1747d485ae2a0d7bfafa6cda80a5a9"), + }, + }, + { + name: "Int", + inputAttributes: map[string]pdata.AttributeValue{ + "user.id": pdata.NewAttributeValueInt(10), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "user.id": pdata.NewAttributeValueString("71aa908aff1548c8c6cdecf63545261584738a25"), + }, + }, + { + name: "Double", + inputAttributes: map[string]pdata.AttributeValue{ + "user.balance": pdata.NewAttributeValueDouble(99.1), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "user.balance": pdata.NewAttributeValueString("76429edab4855b03073f9429fd5d10313c28655e"), + }, + }, + { + name: "Bool", + inputAttributes: map[string]pdata.AttributeValue{ + "user.authenticated": pdata.NewAttributeValueBool(true), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "user.authenticated": pdata.NewAttributeValueString("bf8b4530d8d246dd74ac53a13471bba17941dff7"), + }, + }, + } + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "user.email", Action: processorhelper.HASH}, + {Key: "user.id", Action: processorhelper.HASH}, + {Key: "user.balance", Action: processorhelper.HASH}, + {Key: "user.authenticated", Action: processorhelper.HASH}, + } + + tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + require.Nil(t, err) + require.NotNil(t, tp) + + for _, tt := range testCases { + runIndividualLogTestCase(t, tt, tp) + } +} + +func BenchmarkAttributes_FilterLogsByName(b *testing.B) { + testCases := []logTestCase{ + { + name: "apply_to_log_with_no_attrs", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + }, + }, + { + name: "apply_to_log_with_attr", + inputAttributes: map[string]pdata.AttributeValue{ + "NoModification": pdata.NewAttributeValueBool(false), + }, + expectedAttributes: map[string]pdata.AttributeValue{ + "attribute1": pdata.NewAttributeValueInt(123), + "NoModification": pdata.NewAttributeValueBool(false), + }, + }, + { + name: "dont_apply", + inputAttributes: map[string]pdata.AttributeValue{}, + expectedAttributes: map[string]pdata.AttributeValue{}, + }, + } + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "attribute1", Action: processorhelper.INSERT, Value: 123}, + } + oCfg.Include = &filterconfig.MatchProperties{ + LogNames: []string{"^apply.*"}, + } + tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + require.Nil(b, err) + require.NotNil(b, tp) + + for _, tt := range testCases { + td := generateLogData(tt.name, tt.inputAttributes) + + b.Run(tt.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + assert.NoError(b, tp.ConsumeLogs(context.Background(), td)) + } + }) + + // Ensure that the modified `td` has the attributes sorted: + sortLogAttributes(td) + require.Equal(b, generateLogData(tt.name, tt.expectedAttributes), td) + } +} diff --git a/processor/attributesprocessor/attributes.go b/processor/attributesprocessor/attributes_trace.go similarity index 86% rename from processor/attributesprocessor/attributes.go rename to processor/attributesprocessor/attributes_trace.go index 34ef35f55cb..a2a987f2bf9 100644 --- a/processor/attributesprocessor/attributes.go +++ b/processor/attributesprocessor/attributes_trace.go @@ -22,7 +22,7 @@ import ( "go.opentelemetry.io/collector/processor/processorhelper" ) -type attributesProcessor struct { +type spanAttributesProcessor struct { attrProc *processorhelper.AttrProc include filterspan.Matcher exclude filterspan.Matcher @@ -31,8 +31,8 @@ type attributesProcessor struct { // newTraceProcessor returns a processor that modifies attributes of a span. // To construct the attributes processors, the use of the factory methods are required // in order to validate the inputs. -func newAttributesProcessor(attrProc *processorhelper.AttrProc, include, exclude filterspan.Matcher) *attributesProcessor { - return &attributesProcessor{ +func newSpanAttributesProcessor(attrProc *processorhelper.AttrProc, include, exclude filterspan.Matcher) *spanAttributesProcessor { + return &spanAttributesProcessor{ attrProc: attrProc, include: include, exclude: exclude, @@ -40,7 +40,7 @@ func newAttributesProcessor(attrProc *processorhelper.AttrProc, include, exclude } // ProcessTraces implements the TProcessor -func (a *attributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { +func (a *spanAttributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) diff --git a/processor/attributesprocessor/attributes_test.go b/processor/attributesprocessor/attributes_trace_test.go similarity index 100% rename from processor/attributesprocessor/attributes_test.go rename to processor/attributesprocessor/attributes_trace_test.go diff --git a/processor/attributesprocessor/factory.go b/processor/attributesprocessor/factory.go index baea6b72650..fbda372ef89 100644 --- a/processor/attributesprocessor/factory.go +++ b/processor/attributesprocessor/factory.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/processor/filterlog" "go.opentelemetry.io/collector/internal/processor/filterspan" "go.opentelemetry.io/collector/processor/processorhelper" ) @@ -37,7 +38,8 @@ func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( typeStr, createDefaultConfig, - processorhelper.WithTraces(createTraceProcessor)) + processorhelper.WithTraces(createTraceProcessor), + processorhelper.WithLogs(createLogProcessor)) } // Note: This isn't a valid configuration because the processor would do no work. @@ -76,6 +78,36 @@ func createTraceProcessor( return processorhelper.NewTraceProcessor( cfg, nextConsumer, - newAttributesProcessor(attrProc, include, exclude), + newSpanAttributesProcessor(attrProc, include, exclude), + processorhelper.WithCapabilities(processorCapabilities)) +} + +func createLogProcessor( + _ context.Context, + _ component.ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.LogsConsumer, +) (component.LogsProcessor, error) { + oCfg := cfg.(*Config) + if len(oCfg.Actions) == 0 { + return nil, fmt.Errorf("error creating \"attributes\" processor due to missing required field \"actions\" of processor %q", cfg.Name()) + } + attrProc, err := processorhelper.NewAttrProc(&oCfg.Settings) + if err != nil { + return nil, fmt.Errorf("error creating \"attributes\" processor: %w of processor %q", err, cfg.Name()) + } + include, err := filterlog.NewMatcher(oCfg.Include) + if err != nil { + return nil, err + } + exclude, err := filterlog.NewMatcher(oCfg.Exclude) + if err != nil { + return nil, err + } + + return processorhelper.NewLogsProcessor( + cfg, + nextConsumer, + newLogAttributesProcessor(attrProc, include, exclude), processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/attributesprocessor/factory_test.go b/processor/attributesprocessor/factory_test.go index ee41c437776..3f214f70b6d 100644 --- a/processor/attributesprocessor/factory_test.go +++ b/processor/attributesprocessor/factory_test.go @@ -99,3 +99,51 @@ func TestFactory_CreateMetricsProcessor(t *testing.T) { require.Nil(t, mp) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) } + +func TestFactoryCreateLogsProcessor_EmptyActions(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + assert.Error(t, err) + assert.Nil(t, ap) +} + +func TestFactoryCreateLogsProcessor_InvalidActions(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + // Missing key + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "", Value: 123, Action: processorhelper.UPSERT}, + } + ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + assert.Error(t, err) + assert.Nil(t, ap) +} + +func TestFactoryCreateLogsProcessor(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Key: "a key", Action: processorhelper.DELETE}, + } + + tp, err := factory.CreateLogsProcessor( + context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + assert.NotNil(t, tp) + assert.NoError(t, err) + + tp, err = factory.CreateLogsProcessor( + context.Background(), component.ProcessorCreateParams{}, cfg, nil) + assert.Nil(t, tp) + assert.Error(t, err) + + oCfg.Actions = []processorhelper.ActionKeyValue{ + {Action: processorhelper.DELETE}, + } + tp, err = factory.CreateLogsProcessor( + context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + assert.Nil(t, tp) + assert.Error(t, err) +} diff --git a/processor/spanprocessor/span.go b/processor/spanprocessor/span.go index 386e47999d8..1f745e77caa 100644 --- a/processor/spanprocessor/span.go +++ b/processor/spanprocessor/span.go @@ -47,6 +47,7 @@ func newSpanProcessor(config Config) (*spanProcessor, error) { if err != nil { return nil, err } + exclude, err := filterspan.NewMatcher(config.Exclude) if err != nil { return nil, err From 0f669d2ce208d6042e4e92f9448e39b0eff763a2 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Thu, 22 Oct 2020 14:45:02 +0200 Subject: [PATCH 2/4] don't use deprecated methods --- processor/attributesprocessor/factory_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processor/attributesprocessor/factory_test.go b/processor/attributesprocessor/factory_test.go index 3f214f70b6d..e32ed9c072e 100644 --- a/processor/attributesprocessor/factory_test.go +++ b/processor/attributesprocessor/factory_test.go @@ -103,7 +103,7 @@ func TestFactory_CreateMetricsProcessor(t *testing.T) { func TestFactoryCreateLogsProcessor_EmptyActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop()) assert.Error(t, err) assert.Nil(t, ap) } @@ -116,7 +116,7 @@ func TestFactoryCreateLogsProcessor_InvalidActions(t *testing.T) { oCfg.Actions = []processorhelper.ActionKeyValue{ {Key: "", Value: 123, Action: processorhelper.UPSERT}, } - ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + ap, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop()) assert.Error(t, err) assert.Nil(t, ap) } @@ -130,7 +130,7 @@ func TestFactoryCreateLogsProcessor(t *testing.T) { } tp, err := factory.CreateLogsProcessor( - context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop()) assert.NotNil(t, tp) assert.NoError(t, err) @@ -143,7 +143,7 @@ func TestFactoryCreateLogsProcessor(t *testing.T) { {Action: processorhelper.DELETE}, } tp, err = factory.CreateLogsProcessor( - context.Background(), component.ProcessorCreateParams{}, cfg, exportertest.NewNopLogsExporter()) + context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop()) assert.Nil(t, tp) assert.Error(t, err) } From 37dc5d2e79931461358322665e5239ebf70c3f60 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Thu, 22 Oct 2020 14:45:40 +0200 Subject: [PATCH 3/4] log support for attribute processor --- processor/attributesprocessor/attributes_log.go | 12 +++++++----- processor/attributesprocessor/attributes_trace.go | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/processor/attributesprocessor/attributes_log.go b/processor/attributesprocessor/attributes_log.go index 5a2e2e41581..f6b964969a8 100644 --- a/processor/attributesprocessor/attributes_log.go +++ b/processor/attributesprocessor/attributes_log.go @@ -47,13 +47,15 @@ func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) ( if rs.IsNil() { continue } - ilss := rls.At(i).InstrumentationLibraryLogs() + ilss := rs.InstrumentationLibraryLogs() + resource := rs.Resource() for j := 0; j < ilss.Len(); j++ { ils := ilss.At(j) if ils.IsNil() { continue } logs := ils.Logs() + library := ils.InstrumentationLibrary() for k := 0; k < logs.Len(); k++ { lr := logs.At(k) if lr.IsNil() { @@ -61,7 +63,7 @@ func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) ( continue } - if a.skipLog(lr) { + if a.skipLog(lr, resource, library) { continue } @@ -78,17 +80,17 @@ func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) ( // The logic determining if a log should be processed is set // in the attribute configuration with the include and exclude settings. // Include properties are checked before exclude settings are checked. -func (a *logAttributesProcessor) skipLog(lr pdata.LogRecord) bool { +func (a *logAttributesProcessor) skipLog(lr pdata.LogRecord, resource pdata.Resource, library pdata.InstrumentationLibrary) bool { if a.include != nil { // A false returned in this case means the log should not be processed. - if include := a.include.MatchLogRecord(lr); !include { + if include := a.include.MatchLogRecord(lr, resource, library); !include { return true } } if a.exclude != nil { // A true returned in this case means the log should not be processed. - if exclude := a.exclude.MatchLogRecord(lr); exclude { + if exclude := a.exclude.MatchLogRecord(lr, resource, library); exclude { return true } } diff --git a/processor/attributesprocessor/attributes_trace.go b/processor/attributesprocessor/attributes_trace.go index a2a987f2bf9..63e83c711cc 100644 --- a/processor/attributesprocessor/attributes_trace.go +++ b/processor/attributesprocessor/attributes_trace.go @@ -48,7 +48,7 @@ func (a *spanAttributesProcessor) ProcessTraces(_ context.Context, td pdata.Trac continue } resource := rs.Resource() - ilss := rss.At(i).InstrumentationLibrarySpans() + ilss := rs.InstrumentationLibrarySpans() for j := 0; j < ilss.Len(); j++ { ils := ilss.At(j) if ils.IsNil() { From a3b9e167c5d6914f673e28e3fe8a9abc5a363cbb Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Mon, 26 Oct 2020 17:24:44 +0100 Subject: [PATCH 4/4] remove unneeded space --- processor/spanprocessor/span.go | 1 - 1 file changed, 1 deletion(-) diff --git a/processor/spanprocessor/span.go b/processor/spanprocessor/span.go index 1f745e77caa..386e47999d8 100644 --- a/processor/spanprocessor/span.go +++ b/processor/spanprocessor/span.go @@ -47,7 +47,6 @@ func newSpanProcessor(config Config) (*spanProcessor, error) { if err != nil { return nil, err } - exclude, err := filterspan.NewMatcher(config.Exclude) if err != nil { return nil, err