Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/log: SimpleProcessor synchronizes OnEmit calls #5666

Merged
merged 6 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)

### Fixed

Expand Down
11 changes: 7 additions & 4 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
)

// Exporter handles the delivery of log records to external receivers.
//
// Any of the Exporter's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Exporter to manage
// this concurrency.
type Exporter interface {
// Export transmits log records to a receiver.
//
Expand All @@ -34,6 +30,9 @@ type Exporter interface {
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
//
// Export should never be called concurrently with other Export calls.
// However, it may be called concurrently with other methods.
Export(ctx context.Context, records []Record) error

// Shutdown is called when the SDK shuts down. Any cleanup or release of
Expand All @@ -44,13 +43,17 @@ type Exporter interface {
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
//
// Shutdown may be called concurrently with itself or with other methods.
Shutdown(ctx context.Context) error

// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// ForceFlush may be called concurrently with itself or with other methods.
ForceFlush(ctx context.Context) error
}

Expand Down
4 changes: 4 additions & 0 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var _ Processor = (*SimpleProcessor)(nil)
//
// Use [NewSimpleProcessor] to create a SimpleProcessor.
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter
}

Expand Down Expand Up @@ -43,6 +44,9 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
return nil
}

s.mu.Lock()
defer s.mu.Unlock()

records := simpleProcRecordsPool.Get().(*[]Record)
(*records)[0] = *r
defer func() {
Expand Down
24 changes: 23 additions & 1 deletion sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package log_test

import (
"context"
"io"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -70,6 +72,25 @@ func TestSimpleProcessorForceFlush(t *testing.T) {
require.True(t, e.forceFlushCalled, "exporter ForceFlush not called")
}

type writerExporter struct {
io.Writer
}

func (e *writerExporter) Export(_ context.Context, records []log.Record) error {
for _, r := range records {
_, _ = io.WriteString(e.Writer, r.Body().String())
}
return nil
}

func (e *writerExporter) Shutdown(context.Context) error {
return nil
}

func (e *writerExporter) ForceFlush(context.Context) error {
return nil
}

func TestSimpleProcessorEmpty(t *testing.T) {
assert.NotPanics(t, func() {
var s log.SimpleProcessor
Expand All @@ -91,7 +112,8 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)
e := &writerExporter{new(strings.Builder)}
s := log.NewSimpleProcessor(e)
for i := 0; i < goRoutineN; i++ {
go func() {
defer wg.Done()
Expand Down