Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
olamideTiana committed Oct 3, 2024
2 parents 9fbf9b0 + d6cd593 commit f337073
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 12 deletions.
20 changes: 20 additions & 0 deletions .chloggen/fix-ub-proc-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix data race condition, concurrent writes to the err variable, causes UB (Undefined Behavior)

# One or more tracking issues or pull requests related to the change
issues: [11350]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 5 additions & 4 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ func NewLogs(
span.AddEvent("Start processing.", eventOptions)
recordsIn := ld.LogRecordCount()

ld, err = logsFunc(ctx, ld)
var errFunc error
ld, errFunc = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
return err
return errFunc
}
recordsOut := ld.LogRecordCount()
obs.recordInOut(ctx, recordsIn, recordsOut)
Expand Down
32 changes: 32 additions & 0 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package processorhelper
import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -70,6 +71,37 @@ func newTestLProcessor(retError error) ProcessLogsFunc {
}
}

func TestLogsConcurrency(t *testing.T) {
logsFunc := func(_ context.Context, ld plog.Logs) (plog.Logs, error) {
return ld, nil
}

incomingLogs := plog.NewLogs()
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()

// Add 3 records to the incoming
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), logsFunc)
require.NoError(t, err)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
}
}()
}
wg.Wait()
assert.NoError(t, lp.Shutdown(context.Background()))
}

func TestLogs_RecordInOut(t *testing.T) {
// Regardless of how many logs are ingested, emit just one
mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
Expand Down
9 changes: 5 additions & 4 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ func NewMetrics(
span.AddEvent("Start processing.", eventOptions)
pointsIn := md.DataPointCount()

md, err = metricsFunc(ctx, md)
var errFunc error
md, errFunc = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
return err
return errFunc
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, pointsIn, pointsOut)
Expand Down
31 changes: 31 additions & 0 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package processorhelper
import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -70,6 +71,36 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
}
}

func TestMetricsConcurrency(t *testing.T) {
metricsFunc := func(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
return md, nil
}

incomingMetrics := pmetric.NewMetrics()
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()

// Add 2 data points to the incoming
dps.AppendEmpty()
dps.AppendEmpty()

mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), metricsFunc)
require.NoError(t, err)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
}
}()
}
wg.Wait()
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestMetrics_RecordInOut(t *testing.T) {
// Regardless of how many data points are ingested, emit 3
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {
Expand Down
9 changes: 5 additions & 4 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ func NewTraces(
span.AddEvent("Start processing.", eventOptions)
spansIn := td.SpanCount()

td, err = tracesFunc(ctx, td)
var errFunc error
td, errFunc = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
return err
return errFunc
}
spansOut := td.SpanCount()
obs.recordInOut(ctx, spansIn, spansOut)
Expand Down
33 changes: 33 additions & 0 deletions processor/processorhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package processorhelper
import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -70,6 +71,38 @@ func newTestTProcessor(retError error) ProcessTracesFunc {
}
}

func TestTracesConcurrency(t *testing.T) {
tracesFunc := func(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
return td, nil
}

incomingTraces := ptrace.NewTraces()
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

// Add 4 records to the incoming
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()

mp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), tracesFunc)
require.NoError(t, err)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
assert.NoError(t, mp.ConsumeTraces(context.Background(), incomingTraces))
}
}()
}
wg.Wait()
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestTraces_RecordInOut(t *testing.T) {
// Regardless of how many spans are ingested, emit just one
mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {
Expand Down

0 comments on commit f337073

Please sign in to comment.