Skip to content

Commit

Permalink
fix: handle OTLPJSON unmarshal error (open-telemetry#34784)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Handles the error that the Unmarshaler can return in case an invalid
OTLPJSON is provided, this avoids sending a nil signal to the
corresponding consumer. The fix logs the error and continues the
execution:

```go
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString()))
if err != nil {
	c.logger.Error("could extract traces from otlp json", zap.Error(err))
	continue
}
```

**Link to tracking Issue:** <Issue number if applicable>
open-telemetry#34782

**Testing:** <Describe what testing was performed and which tests were
added.> Factory tests moved to connector tests using the `golden` +
compare testing packages. Testdata includes a file with an invalid json
for each signal.

**Documentation:** <Describe the documentation added.> NA
  • Loading branch information
rogercoll authored and f7o committed Sep 12, 2024
1 parent 8d330ce commit b8fc8c2
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 108 deletions.
27 changes: 27 additions & 0 deletions .chloggen/handle_unmarshall_error.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otlpjsonconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Handle OTLPJSON unmarshal error

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34782]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
180 changes: 180 additions & 0 deletions connector/otlpjsonconnector/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpjsonconnector

import (
"context"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
)

func TestLogsToLogs2(t *testing.T) {
testCases := []struct {
name string
inputFile string
expectedFile string
expectedLogs int
}{
{
name: "correct log metric",
inputFile: "input-log.yaml",
expectedFile: "output-log.yaml",
expectedLogs: 1,
},
{
name: "invalid log",
inputFile: "input-invalid-log.yaml",
expectedLogs: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
factory := NewFactory()
sink := &consumertest.LogsSink{}
conn, err := factory.CreateLogsToLogs(context.Background(),

connectortest.NewNopSettings(), createDefaultConfig(), sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

testLogs, err := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", tc.inputFile))
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

allLogs := sink.AllLogs()
assert.Len(t, allLogs, tc.expectedLogs)

if tc.expectedLogs > 0 {
// golden.WriteLogs(t, filepath.Join("testdata", "logsToLogs", tc.expectedFile), allLogs[0])
expected, err := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", tc.expectedFile))
assert.NoError(t, err)
assert.NoError(t, plogtest.CompareLogs(expected, allLogs[0]))
}
})
}
}

func TestLogsToMetrics(t *testing.T) {
testCases := []struct {
name string
inputFile string
expectedFile string
expectedMetrics int
}{
{
name: "correct log metric",
inputFile: "input-metric.yaml",
expectedFile: "output-metric.yaml",
expectedMetrics: 1,
},
{
name: "invalid metric",
inputFile: "input-invalid-metric.yaml",
expectedMetrics: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
factory := NewFactory()
sink := &consumertest.MetricsSink{}
conn, err := factory.CreateLogsToMetrics(context.Background(),

connectortest.NewNopSettings(), createDefaultConfig(), sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

testLogs, err := golden.ReadLogs(filepath.Join("testdata", "logsToMetrics", tc.inputFile))
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

allMetrics := sink.AllMetrics()
assert.Len(t, allMetrics, tc.expectedMetrics)

if tc.expectedMetrics > 0 {
// golden.WriteMetrics(t, filepath.Join("testdata", "logsToMetrics", tc.expectedFile), allMetrics[0])
expected, err := golden.ReadMetrics(filepath.Join("testdata", "logsToMetrics", tc.expectedFile))
assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(expected, allMetrics[0]))
}
})
}
}

func TestLogsToTraces(t *testing.T) {
testCases := []struct {
name string
inputFile string
expectedFile string
expectedTraces int
}{
{
name: "correct log trace",
inputFile: "input-trace.yaml",
expectedFile: "output-trace.yaml",
expectedTraces: 1,
},
{
name: "invalid trace",
inputFile: "input-invalid-trace.yaml",
expectedTraces: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
factory := NewFactory()
sink := &consumertest.TracesSink{}
conn, err := factory.CreateLogsToTraces(context.Background(),

connectortest.NewNopSettings(), createDefaultConfig(), sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

testLogs, err := golden.ReadLogs(filepath.Join("testdata", "logsToTraces", tc.inputFile))
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

allMetrics := sink.AllTraces()
assert.Len(t, allMetrics, tc.expectedTraces)

if tc.expectedTraces > 0 {
// golden.WriteTraces(t, filepath.Join("testdata", "logsToTraces", tc.expectedFile), allMetrics[0])
expected, err := golden.ReadTraces(filepath.Join("testdata", "logsToTraces", tc.expectedFile))
assert.NoError(t, err)
assert.NoError(t, ptracetest.CompareTraces(expected, allMetrics[0]))
}
})
}
}
97 changes: 0 additions & 97 deletions connector/otlpjsonconnector/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@ package otlpjsonconnector
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

func TestNewFactory(t *testing.T) {
Expand All @@ -35,93 +28,3 @@ func TestNewFactory(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, conn)
}

func TestLogsToLogs(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.LogsSink{}
conn, err := factory.CreateLogsToLogs(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

lp := testdata.GenerateLogs(1)
marshaler := &plog.JSONMarshaler{}
b, err := marshaler.MarshalLogs(lp)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllLogs(), 1)
assert.EqualValues(t, lp, sink.AllLogs()[0])
}

func TestLogsToMetrics(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.MetricsSink{}
conn, err := factory.CreateLogsToMetrics(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

mt := testdata.GenerateMetrics(1)
marshaler := &pmetric.JSONMarshaler{}
b, err := marshaler.MarshalMetrics(mt)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllMetrics(), 1)
assert.EqualValues(t, mt, sink.AllMetrics()[0])
}

func TestLogsToTraces(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)

sink := &consumertest.TracesSink{}
conn, err := factory.CreateLogsToTraces(context.Background(),
connectortest.NewNopSettings(), cfg, sink)
require.NoError(t, err)
require.NotNil(t, conn)
assert.False(t, conn.Capabilities().MutatesData)

require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

td := testdata.GenerateTraces(1)
marshaler := &ptrace.JSONMarshaler{}
b, err := marshaler.MarshalTraces(td)
require.NoError(t, err)

testLogs := testdata.GenerateLogs(1)
testLogs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(string(b))
assert.NoError(t, conn.ConsumeLogs(context.Background(), testLogs))

time.Sleep(1 * time.Second)
require.Len(t, sink.AllTraces(), 1)
assert.EqualValues(t, td, sink.AllTraces()[0])
}
10 changes: 9 additions & 1 deletion connector/otlpjsonconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpj
go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.108.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/confmap v1.14.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/connector v0.108.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/consumer v0.108.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/consumer/consumertest v0.108.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/pdata v1.14.2-0.20240904075637-48b11ba1c5f8
go.opentelemetry.io/collector/pdata/testdata v0.108.2-0.20240904075637-48b11ba1c5f8
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand All @@ -34,6 +35,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.20.2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -60,3 +62,9 @@ require (
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
4 changes: 2 additions & 2 deletions connector/otlpjsonconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions connector/otlpjsonconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var l plog.Logs
l, _ = logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
err := c.logsConsumer.ConsumeLogs(ctx, l)
l, err := logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
continue
}
err = c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
}
}
}
Expand Down
Loading

0 comments on commit b8fc8c2

Please sign in to comment.