diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 8077d465741..3dfe4c41311 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,39 +56,31 @@ func (req *baseRequest) OnProcessingFinished() { } } -type queueSettings struct { - config QueueSettings - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler -} - -func (qs *queueSettings) persistenceEnabled() bool { - return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil -} - // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - queueSettings + queue internal.ProducerConsumerQueue RetrySettings requestExporter bool + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler } // newBaseSettings returns the baseSettings starting from the default and applying all configured options. // requestExporter indicates whether the base settings are for a new request exporter or not. -func newBaseSettings(requestExporter bool, options ...Option) *baseSettings { +// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones. +func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler, + unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings { bs := &baseSettings{ requestExporter: requestExporter, TimeoutSettings: NewDefaultTimeoutSettings(), - // TODO: Enable queuing by default (call DefaultQueueSettings) - queueSettings: queueSettings{ - config: QueueSettings{Enabled: false}, - }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, + marshaler: marshaler, + unmarshaler: unmarshaler, } for _, op := range options { @@ -141,7 +133,14 @@ func WithQueue(config QueueSettings) Option { if o.requestExporter { panic("queueing is not available for the new request exporters yet") } - o.queueSettings.config = config + if !config.Enabled { + return + } + if config.StorageID == nil { + o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + return + } + o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) } } @@ -172,7 +171,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. @@ -181,7 +180,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo } // If no error then start the queuedRetrySender. - return be.qrSender.start(ctx, host) + return be.qrSender.start(ctx, host, set) } be.ShutdownFunc = func(ctx context.Context) error { // First shutdown the queued retry sender diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 9c5bcbe2513..076cd55b00e 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -32,11 +32,11 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) - be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "") + be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -47,7 +47,7 @@ func TestBaseExporterWithOptions(t *testing.T) { be, err := newBaseExporter( defaultSettings, newBaseSettings( - false, + false, nil, nil, WithStart(func(ctx context.Context, host component.Host) error { return want }), WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 7425cd8548e..c7f8655338a 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -6,37 +6,42 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" import ( + "context" "sync" "sync/atomic" + + "go.opentelemetry.io/collector/component" ) // boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue, // where the queue is bounded and if it fills up due to slow consumers, the new items written by // the producer are dropped. type boundedMemoryQueue struct { - stopWG sync.WaitGroup - size *atomic.Uint32 - stopped *atomic.Bool - items chan Request - capacity uint32 + stopWG sync.WaitGroup + size *atomic.Uint32 + stopped *atomic.Bool + items chan Request + capacity uint32 + numConsumers int } // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedMemoryQueue(capacity int) ProducerConsumerQueue { +func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue { return &boundedMemoryQueue{ - items: make(chan Request, capacity), - stopped: &atomic.Bool{}, - size: &atomic.Uint32{}, - capacity: uint32(capacity), + items: make(chan Request, capacity), + stopped: &atomic.Bool{}, + size: &atomic.Uint32{}, + capacity: uint32(capacity), + numConsumers: numConsumers, } } // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. -func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item Request)) { +func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error { var startWG sync.WaitGroup - for i := 0; i < numWorkers; i++ { + for i := 0; i < q.numConsumers; i++ { q.stopWG.Add(1) startWG.Add(1) go func() { @@ -44,11 +49,12 @@ func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item R defer q.stopWG.Done() for item := range q.items { q.size.Add(^uint32(0)) - callback(item) + set.Callback(item) } }() } startWG.Wait() + return nil } // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. @@ -87,3 +93,11 @@ func (q *boundedMemoryQueue) Stop() { func (q *boundedMemoryQueue) Size() int { return int(q.size.Load()) } + +func (q *boundedMemoryQueue) Capacity() int { + return int(q.capacity) +} + +func (q *boundedMemoryQueue) IsPersistent() bool { + return false +} diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 43207104916..9fe809cf2a2 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -6,6 +6,7 @@ package internal import ( + "context" "reflect" "sync" "sync/atomic" @@ -14,8 +15,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exportertest" ) +func newNopQueueSettings(callback func(item Request)) QueueSettings { + return QueueSettings{ + CreateSettings: exportertest.NewNopCreateSettings(), + DataType: component.DataTypeMetrics, + Callback: callback, + } +} + type stringRequest struct { Request str string @@ -29,7 +42,7 @@ func newStringRequest(str string) Request { // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) { - q := NewBoundedMemoryQueue(1) + q := NewBoundedMemoryQueue(1, 1) var startLock sync.Mutex @@ -88,7 +101,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF func TestBoundedQueue(t *testing.T) { helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) { - q.StartConsumers(1, consumerFn) + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn))) }) } @@ -99,14 +112,14 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { - q := NewBoundedMemoryQueue(10) + q := NewBoundedMemoryQueue(10, 1) consumerState := newConsumerState(t) - q.StartConsumers(1, func(item Request) { + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { consumerState.record(item.(stringRequest).str) time.Sleep(1 * time.Second) - }) + }))) q.Produce(newStringRequest("a")) q.Produce(newStringRequest("b")) @@ -183,30 +196,10 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { } func TestZeroSize(t *testing.T) { - q := NewBoundedMemoryQueue(0) + q := NewBoundedMemoryQueue(0, 1) - q.StartConsumers(1, func(item Request) { - }) + err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) + assert.NoError(t, err) assert.False(t, q.Produce(newStringRequest("a"))) // in process } - -func BenchmarkBoundedQueue(b *testing.B) { - q := NewBoundedMemoryQueue(1000) - - q.StartConsumers(10, func(item Request) {}) - - for n := 0; n < b.N; n++ { - q.Produce(newStringRequest("a")) - } -} - -func BenchmarkBoundedQueueWithFactory(b *testing.B) { - q := NewBoundedMemoryQueue(1000) - - q.StartConsumers(10, func(item Request) {}) - - for n := 0; n < b.N; n++ { - q.Produce(newStringRequest("a")) - } -} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 63a617daebd..ba1dcc67230 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -5,28 +5,34 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "errors" "fmt" "sync" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" ) -// Monkey patching for unit test var ( + // Monkey patching for unit test stopStorage = func(queue *persistentQueue) { queue.storage.stop() } + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") ) // persistentQueue holds the queue backed by file storage type persistentQueue struct { - stopWG sync.WaitGroup - stopOnce sync.Once - stopChan chan struct{} - storage *persistentContiguousStorage + stopWG sync.WaitGroup + stopOnce sync.Once + stopChan chan struct{} + storageID component.ID + storage *persistentContiguousStorage + capacity uint64 + numConsumers int + marshaler RequestMarshaler + unmarshaler RequestUnmarshaler } // buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done @@ -35,40 +41,42 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } -type PersistentQueueSettings struct { - Name string - Signal component.DataType - Capacity uint64 - Logger *zap.Logger - Client storage.Client - Unmarshaler RequestUnmarshaler - Marshaler RequestMarshaler -} - // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { +func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler, + unmarshaler RequestUnmarshaler) ProducerConsumerQueue { return &persistentQueue{ - stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), + capacity: uint64(capacity), + numConsumers: numConsumers, + storageID: storageID, + marshaler: marshaler, + unmarshaler: unmarshaler, + stopChan: make(chan struct{}), } } -// StartConsumers starts the given number of consumers which will be consuming items -func (pq *persistentQueue) StartConsumers(numWorkers int, callback func(item Request)) { - for i := 0; i < numWorkers; i++ { +// Start starts the persistentQueue with the given number of consumers. +func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error { + storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType) + if err != nil { + return err + } + storageName := buildPersistentStorageName(set.ID.Name(), set.DataType) + pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) + for i := 0; i < pq.numConsumers; i++ { pq.stopWG.Add(1) go func() { defer pq.stopWG.Done() for { select { case req := <-pq.storage.get(): - callback(req) + set.Callback(req) case <-pq.stopChan: return } } }() } + return nil } // Produce adds an item to the queue and returns true if it was accepted @@ -91,3 +99,35 @@ func (pq *persistentQueue) Stop() { func (pq *persistentQueue) Size() int { return int(pq.storage.size()) } + +func (pq *persistentQueue) Capacity() int { + return int(pq.capacity) +} + +func (pq *persistentQueue) IsPersistent() bool { + return true +} + +func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) { + extension, err := getStorageExtension(host.GetExtensions(), storageID) + if err != nil { + return nil, err + } + + client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) + if err != nil { + return nil, err + } + + return client, err +} + +func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) { + if ext, found := extensions[storageID]; found { + if storageExt, ok := ext.(storage.Extension); ok { + return storageExt, nil + } + return nil, errWrongExtensionType + } + return nil, errNoStorageClient +} diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 8b1ffb7674e..3b282a439bb 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -5,56 +5,68 @@ package internal import ( "context" + "errors" "fmt" + "strconv" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" ) -func createTestQueue(extension storage.Extension, capacity int) *persistentQueue { - logger := zap.NewNop() +type mockHost struct { + component.Host + ext map[component.ID]component.Component +} - client, err := extension.GetClient(context.Background(), component.KindReceiver, component.ID{}, "") - if err != nil { - panic(err) - } +func (nh *mockHost) GetExtensions() map[component.ID]component.Component { + return nh.ext +} - wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ - Name: "foo", - Signal: component.DataTypeTraces, - Capacity: uint64(capacity), - Logger: logger, - Client: client, - Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), - Marshaler: newFakeTracesRequestMarshalerFunc(), - }) - return wq.(*persistentQueue) +// createTestQueue creates and starts a fake queue with the given capacity and number of consumers. +func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) ProducerConsumerQueue { + pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(), + newFakeTracesRequestUnmarshalerFunc()) + host := &mockHost{ext: map[component.ID]component.Component{ + {}: createStorageExtension(t.TempDir()), + }} + err := pq.Start(context.Background(), host, newNopQueueSettings(callback)) + require.NoError(t, err) + t.Cleanup(pq.Stop) + return pq } func TestPersistentQueue_Capacity(t *testing.T) { path := t.TempDir() for i := 0; i < 100; i++ { - ext := createStorageExtension(path) - t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) }) + pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), + newFakeTracesRequestUnmarshalerFunc()) + host := &mockHost{ext: map[component.ID]component.Component{ + {}: createStorageExtension(path), + }} + err := pq.Start(context.Background(), host, newNopQueueSettings(func(req Request) {})) + require.NoError(t, err) + + // Stop consumer to imitate queue overflow + close(pq.(*persistentQueue).stopChan) + pq.(*persistentQueue).stopWG.Wait() - wq := createTestQueue(ext, 5) - assert.Equal(t, 0, wq.Size()) + assert.Equal(t, 0, pq.Size()) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) for i := 0; i < 10; i++ { - result := wq.Produce(req) + result := pq.Produce(req) if i < 6 { assert.True(t, result) } else { @@ -65,26 +77,20 @@ func TestPersistentQueue_Capacity(t *testing.T) { // so the capacity could be used in full if i == 0 { assert.Eventually(t, func() bool { - return wq.Size() == 0 + return pq.Size() == 0 }, 5*time.Second, 10*time.Millisecond) } } - assert.Equal(t, 5, wq.Size()) + assert.Equal(t, 5, pq.Size()) + stopStorage(pq.(*persistentQueue)) } } func TestPersistentQueue_Close(t *testing.T) { - path := t.TempDir() - - ext := createStorageExtension(path) - t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) - - wq := createTestQueue(ext, 1001) + wq := createTestQueue(t, 1001, 100, func(item Request) {}) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) - wq.StartConsumers(100, func(item Request) {}) - for i := 0; i < 1000; i++ { wq.Produce(req) } @@ -100,12 +106,7 @@ func TestPersistentQueue_Close(t *testing.T) { // Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { - path := t.TempDir() - - ext := createStorageExtension(path) - t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) - - wq := createTestQueue(ext, 1001) + wq := createTestQueue(t, 1001, 1, func(item Request) {}) traces := newTraces(1, 10) lastRequestProcessedTime := time.Now() @@ -121,8 +122,6 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { queue.storage.stop() } - wq.StartConsumers(1, func(item Request) {}) - for i := 0; i < 1000; i++ { wq.Produce(req) } @@ -162,19 +161,11 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { for _, c := range cases { t.Run(fmt.Sprintf("#messages: %d #consumers: %d", c.numMessagesProduced, c.numConsumers), func(t *testing.T) { - path := t.TempDir() - traces := newTraces(1, 10) req := newFakeTracesRequest(traces) - ext := createStorageExtension(path) - tq := createTestQueue(ext, 1000) - - defer tq.Stop() - t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) }) - numMessagesConsumed := &atomic.Int32{} - tq.StartConsumers(c.numConsumers, func(item Request) { + tq := createTestQueue(t, 1000, c.numConsumers, func(item Request) { if item != nil { numMessagesConsumed.Add(int32(1)) } @@ -217,3 +208,90 @@ func newTraces(numTraces int, numSpans int) ptrace.Traces { return traces } + +func TestToStorageClient(t *testing.T) { + getStorageClientError := errors.New("unable to create storage client") + testCases := []struct { + desc string + storage storage.Extension + numStorages int + storageIndex int + expectedError error + getClientError error + }{ + { + desc: "obtain storage extension by name", + numStorages: 2, + storageIndex: 0, + expectedError: nil, + }, + { + desc: "fail on not existing storage extension", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, + }, + { + desc: "invalid extension type", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, + }, + { + desc: "fail on error getting storage client from extension", + numStorages: 1, + storageIndex: 0, + expectedError: getStorageClientError, + getClientError: getStorageClientError, + }, + } + + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + storageID := component.NewIDWithName("file_storage", strconv.Itoa(tC.storageIndex)) + + var extensions = map[component.ID]component.Component{} + for i := 0; i < tC.numStorages; i++ { + extensions[component.NewIDWithName("file_storage", + strconv.Itoa(i))] = &mockStorageExtension{getClientError: tC.getClientError} + } + host := &mockHost{ext: extensions} + ownerID := component.NewID("foo_exporter") + + // execute + client, err := toStorageClient(context.Background(), storageID, host, ownerID, component.DataTypeTraces) + + // verify + if tC.expectedError != nil { + assert.ErrorIs(t, err, tC.expectedError) + assert.Nil(t, client) + } else { + assert.NoError(t, err) + assert.NotNil(t, client) + } + }) + } +} + +func TestInvalidStorageExtensionType(t *testing.T) { + storageID := component.NewIDWithName("extension", "extension") + + // make a test extension + factory := extensiontest.NewNopFactory() + extConfig := factory.CreateDefaultConfig() + settings := extensiontest.NewNopCreateSettings() + extension, err := factory.CreateExtension(context.Background(), settings, extConfig) + assert.NoError(t, err) + var extensions = map[component.ID]component.Component{ + storageID: extension, + } + host := &mockHost{ext: extensions} + ownerID := component.NewID("foo_exporter") + + // execute + client, err := toStorageClient(context.Background(), storageID, host, ownerID, component.DataTypeTraces) + + // we should get an error about the extension type + assert.ErrorIs(t, err, errWrongExtensionType) + assert.Nil(t, client) +} diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index cbbcf2e03c5..49f204bbb0d 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -81,15 +81,16 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, client storage.Client, + logger *zap.Logger, capacity uint64, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: set.Logger, - client: set.Client, + logger: logger, + client: client, queueName: queueName, - unmarshaler: set.Unmarshaler, - marshaler: set.Marshaler, - capacity: set.Capacity, - putChan: make(chan struct{}, set.Capacity), + unmarshaler: unmarshaler, + marshaler: marshaler, + capacity: capacity, + putChan: make(chan struct{}, capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index 7ddb6b456b1..693fe2d2ba2 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,13 +36,8 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ - Capacity: capacity, - Logger: logger, - Client: client, - Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), - Marshaler: newFakeTracesRequestMarshalerFunc(), - }) + return newPersistentContiguousStorage(context.Background(), "foo", client, logger, capacity, + newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -355,7 +350,7 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // No more items ext := createStorageExtension(path) - wq := createTestQueue(ext, 1000) + wq := createTestQueue(t, 1000, 1, func(Request) {}) require.Equal(t, 0, wq.Size()) require.NoError(t, ext.Shutdown(context.Background())) } @@ -578,9 +573,13 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou type mockStorageExtension struct { component.StartFunc component.ShutdownFunc + getClientError error } func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) { + if m.getClientError != nil { + return nil, m.getClientError + } return &mockStorageClient{st: map[string][]byte{}}, nil } diff --git a/exporter/exporterhelper/internal/producer_consumer_queue.go b/exporter/exporterhelper/internal/producer_consumer_queue.go index c6a1e3a23bd..749bd52807b 100644 --- a/exporter/exporterhelper/internal/producer_consumer_queue.go +++ b/exporter/exporterhelper/internal/producer_consumer_queue.go @@ -5,12 +5,25 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +type QueueSettings struct { + exporter.CreateSettings + DataType component.DataType + Callback func(item Request) +} + // ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue // (boundedMemoryQueue) or via a disk-based queue (persistentQueue) type ProducerConsumerQueue interface { - // StartConsumers starts a given number of goroutines consuming items from the queue + // Start starts the queue with a given number of goroutines consuming items from the queue // and passing them into the consumer callback. - StartConsumers(num int, callback func(item Request)) + Start(ctx context.Context, host component.Host, set QueueSettings) error // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added // to the queue due to queue overflow. Produce(item Request) bool @@ -19,4 +32,8 @@ type ProducerConsumerQueue interface { // Stop stops all consumers, as well as the length reporter if started, // and releases the items channel. It blocks until all consumers have stopped. Stop() + // Capacity returns the capacity of the queue. + Capacity() int + // IsPersistent returns true if the queue is persistent. + IsPersistent() bool } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index fd0d86fb9be..ef22d50dd02 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -89,9 +89,7 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := newBaseSettings(false, options...) - bs.marshaler = logsRequestMarshaler - bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + bs := newBaseSettings(false, logsRequestMarshaler, newLogsRequestUnmarshalerFunc(pusher), options...) be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err @@ -143,7 +141,7 @@ func NewLogsRequestExporter( return nil, errNilLogsConverter } - bs := newBaseSettings(true, options...) + bs := newBaseSettings(true, nil, nil, options...) be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 2ea905b6321..a13b010e955 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -89,9 +89,7 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := newBaseSettings(false, options...) - bs.marshaler = metricsRequestMarshaler - bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + bs := newBaseSettings(false, metricsRequestMarshaler, newMetricsRequestUnmarshalerFunc(pusher), options...) be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err @@ -143,7 +141,7 @@ func NewMetricsRequestExporter( return nil, errNilMetricsConverter } - bs := newBaseSettings(true, options...) + bs := newBaseSettings(true, nil, nil, options...) be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 79505445b47..98d87f09ae0 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -18,18 +18,14 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) const defaultQueueSize = 1000 -var ( - errSendingQueueIsFull = errors.New("sending_queue is full") - errNoStorageClient = errors.New("no storage client extension found") - errWrongExtensionType = errors.New("requested extension is not a storage extension") -) +var errSendingQueueIsFull = errors.New("sending_queue is full") // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { @@ -73,7 +69,6 @@ type queuedRetrySender struct { fullName string id component.ID signal component.DataType - queueSettings queueSettings consumerSender requestSender queue internal.ProducerConsumerQueue retryStopCh chan struct{} @@ -82,8 +77,8 @@ type queuedRetrySender struct { requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, - nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, + rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) @@ -92,10 +87,12 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSe fullName: id.String(), id: id, signal: signal, - queueSettings: qs, + queue: queue, retryStopCh: retryStopCh, traceAttribute: traceAttr, logger: sampledLogger, + // TODO: this can be further exposed as a config param rather than relying on a type of queue + requeuingEnabled: queue != nil && queue.IsPersistent(), } qrs.consumerSender = &retrySender{ @@ -108,65 +105,9 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSe onTemporaryFailure: qrs.onTemporaryFailure, } - if !qs.persistenceEnabled() { - qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) - } - // The Persistent Queue is initialized separately as it needs extra information about the component - return qrs } -func getStorageExtension(extensions map[component.ID]component.Component, storageID component.ID) (storage.Extension, error) { - if ext, found := extensions[storageID]; found { - if storageExt, ok := ext.(storage.Extension); ok { - return storageExt, nil - } - return nil, errWrongExtensionType - } - return nil, errNoStorageClient -} - -func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) { - extension, err := getStorageExtension(host.GetExtensions(), storageID) - if err != nil { - return nil, err - } - - client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) - if err != nil { - return nil, err - } - - return client, err -} - -// initializePersistentQueue uses extra information for initialization available from component.Host -func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if !qrs.queueSettings.persistenceEnabled() { - return nil - } - - storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) - if err != nil { - return err - } - - qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ - Name: qrs.fullName, - Signal: qrs.signal, - Capacity: uint64(qrs.queueSettings.config.QueueSize), - Logger: qrs.logger, - Client: storageClient, - Marshaler: qrs.queueSettings.marshaler, - Unmarshaler: qrs.queueSettings.unmarshaler, - }) - - // TODO: this can be further exposed as a config param rather than relying on a type of queue - qrs.requeuingEnabled = true - - return nil -} - func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { if !qrs.requeuingEnabled || qrs.queue == nil { logger.Error( @@ -193,30 +134,35 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna } // start is invoked during service startup. -func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error { - if err := qrs.initializePersistentQueue(ctx, host); err != nil { - return err +func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { + if qrs.queue == nil { + return nil } - qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { - _ = qrs.consumerSender.send(item) - item.OnProcessingFinished() + err := qrs.queue.Start(ctx, host, internal.QueueSettings{ + CreateSettings: set, + DataType: qrs.signal, + Callback: func(item internal.Request) { + _ = qrs.consumerSender.send(item) + item.OnProcessingFinished() + }, }) + if err != nil { + return err + } // Start reporting queue length metric - if qrs.queueSettings.config.Enabled { - err := globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qrs.queue.Size()) - }, metricdata.NewLabelValue(qrs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue size metric: %w", err) - } - err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.queueSettings.config.QueueSize) - }, metricdata.NewLabelValue(qrs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue capacity metric: %w", err) - } + err = globalInstruments.queueSize.UpsertEntry(func() int64 { + return int64(qrs.queue.Size()) + }, metricdata.NewLabelValue(qrs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue size metric: %w", err) + } + err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { + return int64(qrs.queue.Capacity()) + }, metricdata.NewLabelValue(qrs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue capacity metric: %w", err) } return nil @@ -225,7 +171,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.queueSettings.config.Enabled { + if qrs.queue != nil { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -294,7 +240,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.queueSettings.config.Enabled { + if qrs.queue == nil { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 2858478f9a0..81403cd8b56 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "sync/atomic" "testing" @@ -23,8 +22,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/experimental/storage" - "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport/obsreporttest" ) @@ -43,9 +42,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) - bs.marshaler = mockRequestMarshaler - bs.unmarshaler = mockRequestUnmarshaler(mockR) + bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) @@ -70,9 +67,9 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) - bs.marshaler = mockRequestMarshaler - bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + bs := newBaseSettings(false, mockRequestMarshaler, + mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), + WithRetry(rCfg), WithQueue(qCfg)) be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) @@ -99,7 +96,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -126,7 +123,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -160,7 +157,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -190,7 +187,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -237,7 +234,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -270,7 +267,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -297,7 +294,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -318,7 +315,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -353,7 +350,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -395,99 +392,13 @@ func TestQueueSettings_Validate(t *testing.T) { assert.NoError(t, qCfg.Validate()) } -func TestGetRetrySettings(t *testing.T) { - getStorageClientError := errors.New("unable to create storage client") - testCases := []struct { - desc string - storage storage.Extension - numStorages int - storageIndex int - expectedError error - getClientError error - }{ - { - desc: "obtain storage extension by name", - numStorages: 2, - storageIndex: 0, - expectedError: nil, - }, - { - desc: "fail on not existing storage extension", - numStorages: 2, - storageIndex: 100, - expectedError: errNoStorageClient, - }, - { - desc: "invalid extension type", - numStorages: 2, - storageIndex: 100, - expectedError: errNoStorageClient, - }, - { - desc: "fail on error getting storage client from extension", - numStorages: 1, - storageIndex: 0, - expectedError: getStorageClientError, - getClientError: getStorageClientError, - }, - } - - for _, tC := range testCases { - t.Run(tC.desc, func(t *testing.T) { - storageID := component.NewIDWithName("file_storage", strconv.Itoa(tC.storageIndex)) - - var extensions = map[component.ID]component.Component{} - for i := 0; i < tC.numStorages; i++ { - extensions[component.NewIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError} - } - host := &mockHost{ext: extensions} - ownerID := component.NewID("foo_exporter") - - // execute - client, err := toStorageClient(context.Background(), storageID, host, ownerID, component.DataTypeTraces) - - // verify - if tC.expectedError != nil { - assert.ErrorIs(t, err, tC.expectedError) - assert.Nil(t, client) - } else { - assert.NoError(t, err) - assert.NotNil(t, client) - } - }) - } -} - -func TestInvalidStorageExtensionType(t *testing.T) { - storageID := component.NewIDWithName("extension", "extension") - - // make a test extension - factory := extensiontest.NewNopFactory() - extConfig := factory.CreateDefaultConfig() - settings := extensiontest.NewNopCreateSettings() - extension, err := factory.CreateExtension(context.Background(), settings, extConfig) - assert.NoError(t, err) - var extensions = map[component.ID]component.Component{ - storageID: extension, - } - host := &mockHost{ext: extensions} - ownerID := component.NewID("foo_exporter") - - // execute - client, err := toStorageClient(context.Background(), storageID, host, ownerID, component.DataTypeTraces) - - // we should get an error about the extension type - assert.ErrorIs(t, err, errWrongExtensionType) - assert.Nil(t, client) -} - // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -519,7 +430,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -534,6 +445,17 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { mockR.checkNumRequests(t, 1) } +func TestQueueRetryWithDisabledQueue(t *testing.T) { + qs := NewDefaultQueueSettings() + qs.Enabled = false + bs := newBaseSettings(false, nil, nil, WithQueue(qs)) + require.Nil(t, bs.queue) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) +} + func TestQueuedRetryPersistenceEnabled(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(defaultID) require.NoError(t, err) @@ -544,7 +466,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -568,7 +490,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) bs.marshaler = mockRequestMarshaler bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) be, err := newBaseExporter(set, bs, "") @@ -595,7 +517,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -630,10 +552,10 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { } func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs := newBaseSettings(true, WithRetry(NewDefaultRetrySettings())) + bs := newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings())) assert.True(t, bs.requestExporter) assert.Panics(t, func() { - _ = newBaseSettings(true, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + _ = newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) }) } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index eaec59217e5..03b0fefb95e 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -89,9 +89,7 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := newBaseSettings(false, options...) - bs.marshaler = tracesRequestMarshaler - bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + bs := newBaseSettings(false, tracesRequestMarshaler, newTraceRequestUnmarshalerFunc(pusher), options...) be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err @@ -143,7 +141,7 @@ func NewTracesRequestExporter( return nil, errNilTracesConverter } - bs := newBaseSettings(true, options...) + bs := newBaseSettings(true, nil, nil, options...) be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil {