Skip to content

Commit

Permalink
[exporterhelper] Refactor queue initialization
Browse files Browse the repository at this point in the history
Make the queue initialization process consistent for both queue types. Instead of different workflows for memory and persistent queues, this change breaks it into two generic steps:
1. Queue factory
2. Start method

This change:
- reduces coupling;
- allows future refactoring of queuedRetrySender;
- allows extracting of the queue package from the exporterhelper;
- makes it possible to have `WithRequestQueue` option for the new exporter helper API.
  • Loading branch information
dmitryax committed Aug 26, 2023
1 parent 92f5fe6 commit 23d1e4d
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 362 deletions.
37 changes: 18 additions & 19 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,31 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queueSettings
queue internal.ProducerConsumerQueue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones.
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
Expand Down Expand Up @@ -141,7 +133,14 @@ func WithQueue(config QueueSettings) Option {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
o.queueSettings.config = config
if !config.Enabled {
return
}
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
return
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -172,7 +171,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand All @@ -181,7 +180,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
}

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host)
return be.qrSender.start(ctx, host, set)
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -47,7 +47,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
be, err := newBaseExporter(
defaultSettings,
newBaseSettings(
false,
false, nil, nil,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
Expand Down
40 changes: 27 additions & 13 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,55 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
capacity uint32
numConsumers int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue {
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
items: make(chan Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
numConsumers: numConsumers,
}
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) {
func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error {
var startWG sync.WaitGroup
for i := 0; i < numWorkers; i++ {
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
q.size.Add(^uint32(0))
callback(item)
set.Callback(item)
}
}()
}
startWG.Wait()
return nil
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
Expand Down Expand Up @@ -87,3 +93,11 @@ func (q *boundedMemoryQueue) Stop() {
func (q *boundedMemoryQueue) Size() int {
return int(q.size.Load())
}

func (q *boundedMemoryQueue) Capacity() int {
return int(q.capacity)
}

func (q *boundedMemoryQueue) IsPersistent() bool {
return false
}
49 changes: 21 additions & 28 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package internal

import (
"context"
"reflect"
"sync"
"sync/atomic"
Expand All @@ -14,8 +15,20 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

type stringRequest struct {
Request
str string
Expand All @@ -29,7 +42,7 @@ func newStringRequest(str string) Request {
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) {
q := NewBoundedMemoryQueue(1)
q := NewBoundedMemoryQueue(1, 1)

var startLock sync.Mutex

Expand Down Expand Up @@ -88,7 +101,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF

func TestBoundedQueue(t *testing.T) {
helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) {
q.StartConsumers(1, consumerFn)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn)))
})
}

Expand All @@ -99,14 +112,14 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue(10)
q := NewBoundedMemoryQueue(10, 1)

consumerState := newConsumerState(t)

q.StartConsumers(1, func(item Request) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
consumerState.record(item.(stringRequest).str)
time.Sleep(1 * time.Second)
})
})))

q.Produce(newStringRequest("a"))
q.Produce(newStringRequest("b"))
Expand Down Expand Up @@ -183,30 +196,10 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue(0)
q := NewBoundedMemoryQueue(0, 1)

q.StartConsumers(1, func(item Request) {
})
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedMemoryQueue(1000)

q.StartConsumers(10, func(item Request) {})

for n := 0; n < b.N; n++ {
q.Produce(newStringRequest("a"))
}
}
Loading

0 comments on commit 23d1e4d

Please sign in to comment.