From 767b95e73a643989fc1ff327376eaa8e536cbe75 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 13 Sep 2023 13:15:45 -0700 Subject: [PATCH] [chore] [exporterhelper] Split exporter senders into different files (#8398) This is a follow-up to https://github.com/open-telemetry/opentelemetry-collector/pull/8369. No functional changes, just moving the code around --- exporter/exporterhelper/common.go | 31 -- exporter/exporterhelper/common_test.go | 11 + exporter/exporterhelper/queue_sender.go | 208 +++++++++++ exporter/exporterhelper/queue_sender_test.go | 349 ++++++++++++++++++ .../{queued_retry.go => retry_sender.go} | 190 ---------- ...ued_retry_test.go => retry_sender_test.go} | 342 ----------------- exporter/exporterhelper/timeout_sender.go | 42 +++ 7 files changed, 610 insertions(+), 563 deletions(-) create mode 100644 exporter/exporterhelper/queue_sender.go create mode 100644 exporter/exporterhelper/queue_sender_test.go rename exporter/exporterhelper/{queued_retry.go => retry_sender.go} (52%) rename exporter/exporterhelper/{queued_retry_test.go => retry_sender_test.go} (50%) create mode 100644 exporter/exporterhelper/timeout_sender.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 443fc477865..f6277404619 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -17,19 +17,6 @@ import ( "go.opentelemetry.io/collector/obsreport" ) -// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. -type TimeoutSettings struct { - // Timeout is the timeout for every attempt to send data to the backend. - Timeout time.Duration `mapstructure:"timeout"` -} - -// NewDefaultTimeoutSettings returns the default settings for TimeoutSettings. -func NewDefaultTimeoutSettings() TimeoutSettings { - return TimeoutSettings{ - Timeout: 5 * time.Second, - } -} - // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { start(ctx context.Context, host component.Host, set exporter.CreateSettings) error @@ -251,24 +238,6 @@ func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandli } } -// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. -type timeoutSender struct { - baseRequestSender - cfg TimeoutSettings -} - -func (ts *timeoutSender) send(req internal.Request) error { - // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be - // updated because this deadline most likely is before the next one. - ctx := req.Context() - if ts.cfg.Timeout > 0 { - var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(req.Context(), ts.cfg.Timeout) - defer cancelFunc() - } - return req.Export(ctx) -} - func createSampledLogger(logger *zap.Logger) *zap.Logger { if logger.Core().Enabled(zapcore.DebugLevel) { // Debugging is enabled. Don't do any sampling. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index d505a3a35be..9889754e231 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -68,3 +68,14 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd) } } + +func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings())) + require.Nil(t, err) + require.True(t, bs.requestExporter) + require.Panics(t, func() { + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + }) +} diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go new file mode 100644 index 00000000000..d3da29500fe --- /dev/null +++ b/exporter/exporterhelper/queue_sender.go @@ -0,0 +1,208 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opencensus.io/metric/metricdata" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" +) + +const defaultQueueSize = 1000 + +var errSendingQueueIsFull = errors.New("sending_queue is full") + +// QueueSettings defines configuration for queueing batches before sending to the consumerSender. +type QueueSettings struct { + // Enabled indicates whether to not enqueue batches before sending to the consumerSender. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + QueueSize int `mapstructure:"queue_size"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} + +// NewDefaultQueueSettings returns the default settings for QueueSettings. +func NewDefaultQueueSettings() QueueSettings { + return QueueSettings{ + Enabled: true, + NumConsumers: 10, + // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue + // This can be estimated at 1-4 GB worth of maximum memory usage + // This default is probably still too high, and may be adjusted further down in a future release + QueueSize: defaultQueueSize, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *QueueSettings) Validate() error { + if !qCfg.Enabled { + return nil + } + + if qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + + return nil +} + +type queueSender struct { + baseRequestSender + fullName string + id component.ID + signal component.DataType + queue internal.ProducerConsumerQueue + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool +} + +func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { + return &queueSender{ + fullName: id.String(), + id: id, + signal: signal, + queue: queue, + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + logger: logger, + // TODO: this can be further exposed as a config param rather than relying on a type of queue + requeuingEnabled: queue != nil && queue.IsPersistent(), + } +} + +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { + if !qs.requeuingEnabled || qs.queue == nil { + logger.Error( + "Exporting failed. No more retries left. Dropping data.", + zap.Error(err), + zap.Int("dropped_items", req.Count()), + ) + return err + } + + if qs.queue.Produce(req) { + logger.Error( + "Exporting failed. Putting back to the end of the queue.", + zap.Error(err), + ) + } else { + logger.Error( + "Exporting failed. Queue did not accept requeuing request. Dropping data.", + zap.Error(err), + zap.Int("dropped_items", req.Count()), + ) + } + return err +} + +// start is invoked during service startup. +func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { + if qs.queue == nil { + return nil + } + + err := qs.queue.Start(ctx, host, internal.QueueSettings{ + CreateSettings: set, + DataType: qs.signal, + Callback: func(item internal.Request) { + _ = qs.nextSender.send(item) + item.OnProcessingFinished() + }, + }) + if err != nil { + return err + } + + // Start reporting queue length metric + err = globalInstruments.queueSize.UpsertEntry(func() int64 { + return int64(qs.queue.Size()) + }, metricdata.NewLabelValue(qs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue size metric: %w", err) + } + err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { + return int64(qs.queue.Capacity()) + }, metricdata.NewLabelValue(qs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue capacity metric: %w", err) + } + + return nil +} + +// shutdown is invoked during service shutdown. +func (qs *queueSender) shutdown() { + if qs.queue != nil { + // Cleanup queue metrics reporting + _ = globalInstruments.queueSize.UpsertEntry(func() int64 { + return int64(0) + }, metricdata.NewLabelValue(qs.fullName)) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + qs.queue.Stop() + } +} + +// send implements the requestSender interface +func (qs *queueSender) send(req internal.Request) error { + if qs.queue == nil { + err := qs.nextSender.send(req) + if err != nil { + qs.logger.Error( + "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", + zap.Int("dropped_items", req.Count()), + ) + } + return err + } + + // Prevent cancellation and deadline to propagate to the context stored in the queue. + // The grpc/http based receivers will cancel the request context after this function returns. + req.SetContext(noCancellationContext{Context: req.Context()}) + + span := trace.SpanFromContext(req.Context()) + if !qs.queue.Produce(req) { + qs.logger.Error( + "Dropping data because sending_queue is full. Try increasing queue_size.", + zap.Int("dropped_items", req.Count()), + ) + span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) + return errSendingQueueIsFull + } + + span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) + return nil +} + +type noCancellationContext struct { + context.Context +} + +func (noCancellationContext) Deadline() (deadline time.Time, ok bool) { + return +} + +func (noCancellationContext) Done() <-chan struct{} { + return nil +} + +func (noCancellationContext) Err() error { + return nil +} diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go new file mode 100644 index 00000000000..d5c7f00de6b --- /dev/null +++ b/exporter/exporterhelper/queue_sender_test.go @@ -0,0 +1,349 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestQueuedRetry_StopWhileWaiting(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := NewDefaultRetrySettings() + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + firstMockR := newErrorRequest(context.Background()) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.send(firstMockR)) + }) + + // Enqueue another request to ensure when calling shutdown we drain the queue. + secondMockR := newMockRequest(context.Background(), 3, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.send(secondMockR)) + }) + + require.LessOrEqual(t, 1, be.queueSender.(*queueSender).queue.Size()) + + assert.NoError(t, be.Shutdown(context.Background())) + + secondMockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) +} + +func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := NewDefaultRetrySettings() + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + mockR := newMockRequest(ctx, 2, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) +} + +func TestQueuedRetry_DropOnFull(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.QueueSize = 0 + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + require.Error(t, be.send(newMockRequest(context.Background(), 2, nil))) +} + +func TestQueuedRetryHappyPath(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueSettings() + rCfg := NewDefaultRetrySettings() + set := tt.ToExporterCreateSettings() + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + wantRequests := 10 + reqs := make([]*mockRequest, 0, 10) + for i := 0; i < wantRequests; i++ { + ocs.run(func() { + req := newMockRequest(context.Background(), 2, nil) + reqs = append(reqs, req) + require.NoError(t, be.send(req)) + }) + } + + // Wait until all batches received + ocs.awaitAsyncProcessing() + + require.Len(t, reqs, wantRequests) + for _, req := range reqs { + req.checkNumRequests(t, 1) + } + + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) +} + +func TestQueuedRetry_QueueMetricsReported(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 0 // to make every request go straight to the queue + rCfg := NewDefaultRetrySettings() + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity") + for i := 0; i < 7; i++ { + require.NoError(t, be.send(newErrorRequest(context.Background()))) + } + checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size") + + assert.NoError(t, be.Shutdown(context.Background())) + checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size") +} + +func TestNoCancellationContext(t *testing.T) { + deadline := time.Now().Add(1 * time.Second) + ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) + cancelFunc() + require.Error(t, ctx.Err()) + d, ok := ctx.Deadline() + require.True(t, ok) + require.Equal(t, deadline, d) + + nctx := noCancellationContext{Context: ctx} + assert.NoError(t, nctx.Err()) + d, ok = nctx.Deadline() + assert.False(t, ok) + assert.True(t, d.IsZero()) +} + +func TestQueueSettings_Validate(t *testing.T) { + qCfg := NewDefaultQueueSettings() + assert.NoError(t, qCfg.Validate()) + + qCfg.QueueSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} + +// if requeueing is enabled, we eventually retry even if we failed at first +func TestQueuedRetry_RequeuingEnabled(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := NewDefaultRetrySettings() + rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + be.queueSender.(*queueSender).requeuingEnabled = true + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) + mockR := newMockRequest(context.Background(), 1, traceErr) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.send(mockR)) + ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing + }) + ocs.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 1) + ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here +} + +// if requeueing is enabled, but the queue is full, we get an error +func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 0 + qCfg.QueueSize = 0 + rCfg := NewDefaultRetrySettings() + rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + be.queueSender.(*queueSender).requeuingEnabled = true + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) + mockR := newMockRequest(context.Background(), 1, traceErr) + + require.Error(t, be.retrySender.send(mockR), "sending_queue is full") + mockR.checkNumRequests(t, 1) +} + +func TestQueueRetryWithDisabledQueue(t *testing.T) { + qs := NewDefaultQueueSettings() + qs.Enabled = false + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) + require.Nil(t, be.queueSender.(*queueSender).queue) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestQueuedRetryPersistenceEnabled(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueSettings() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + set := tt.ToExporterCreateSettings() + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(nil), + } + host := &mockHost{ext: extensions} + + // we start correctly with a file storage extension + require.NoError(t, be.Start(context.Background(), host)) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { + storageError := errors.New("could not get storage client") + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueSettings() + storageID := component.NewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := NewDefaultRetrySettings() + set := tt.ToExporterCreateSettings() + be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(storageError), + } + host := &mockHost{ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") +} + +func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { + + produceCounter := &atomic.Uint32{} + + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := NewDefaultRetrySettings() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered + + req := newMockRequest(context.Background(), 3, errors.New("some error")) + + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + require.NoError(t, be.Start(context.Background(), &mockHost{})) + + // wraps original queue so we can count operations + be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ + ProducerConsumerQueue: be.queueSender.(*queueSender).queue, + produceCounter: produceCounter, + } + be.queueSender.(*queueSender).requeuingEnabled = true + + // replace nextSender inside retrySender to always return error so it doesn't exit send loop + be.retrySender.setNextSender(&errorRequestSender{ + errToReturn: errors.New("some error"), + }) + + // Invoke queuedRetrySender so the producer will put the item for consumer to poll + require.NoError(t, be.send(req)) + + // first wait for the item to be produced to the queue initially + assert.Eventually(t, func() bool { + return produceCounter.Load() == uint32(1) + }, time.Second, 1*time.Millisecond) + + // shuts down and ensure the item is produced in the queue again + require.NoError(t, be.Shutdown(context.Background())) + assert.Eventually(t, func() bool { + return produceCounter.Load() == uint32(2) + }, time.Second, 1*time.Millisecond) +} + +type mockHost struct { + component.Host + ext map[component.ID]component.Component +} + +func (nh *mockHost) GetExtensions() map[component.ID]component.Component { + return nh.ext +} diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/retry_sender.go similarity index 52% rename from exporter/exporterhelper/queued_retry.go rename to exporter/exporterhelper/retry_sender.go index 3aceb411745..14a90a9c1e6 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/retry_sender.go @@ -4,164 +4,21 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( - "context" "errors" "fmt" "time" "github.com/cenkalti/backoff/v4" - "go.opencensus.io/metric/metricdata" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) -const defaultQueueSize = 1000 - -var errSendingQueueIsFull = errors.New("sending_queue is full") - -// QueueSettings defines configuration for queueing batches before sending to the consumerSender. -type QueueSettings struct { - // Enabled indicates whether to not enqueue batches before sending to the consumerSender. - Enabled bool `mapstructure:"enabled"` - // NumConsumers is the number of consumers from the queue. - NumConsumers int `mapstructure:"num_consumers"` - // QueueSize is the maximum number of batches allowed in queue at a given time. - QueueSize int `mapstructure:"queue_size"` - // StorageID if not empty, enables the persistent storage and uses the component specified - // as a storage extension for the persistent queue - StorageID *component.ID `mapstructure:"storage"` -} - -// NewDefaultQueueSettings returns the default settings for QueueSettings. -func NewDefaultQueueSettings() QueueSettings { - return QueueSettings{ - Enabled: true, - NumConsumers: 10, - // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue - // This can be estimated at 1-4 GB worth of maximum memory usage - // This default is probably still too high, and may be adjusted further down in a future release - QueueSize: defaultQueueSize, - } -} - -// Validate checks if the QueueSettings configuration is valid -func (qCfg *QueueSettings) Validate() error { - if !qCfg.Enabled { - return nil - } - - if qCfg.QueueSize <= 0 { - return errors.New("queue size must be positive") - } - - return nil -} - -type queueSender struct { - baseRequestSender - fullName string - id component.ID - signal component.DataType - queue internal.ProducerConsumerQueue - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool -} - -func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { - return &queueSender{ - fullName: id.String(), - id: id, - signal: signal, - queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), - logger: logger, - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: queue != nil && queue.IsPersistent(), - } -} - -func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { - if !qs.requeuingEnabled || qs.queue == nil { - logger.Error( - "Exporting failed. No more retries left. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.Count()), - ) - return err - } - - if qs.queue.Produce(req) { - logger.Error( - "Exporting failed. Putting back to the end of the queue.", - zap.Error(err), - ) - } else { - logger.Error( - "Exporting failed. Queue did not accept requeuing request. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.Count()), - ) - } - return err -} - -// start is invoked during service startup. -func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { - if qs.queue == nil { - return nil - } - - err := qs.queue.Start(ctx, host, internal.QueueSettings{ - CreateSettings: set, - DataType: qs.signal, - Callback: func(item internal.Request) { - _ = qs.nextSender.send(item) - item.OnProcessingFinished() - }, - }) - if err != nil { - return err - } - - // Start reporting queue length metric - err = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qs.queue.Size()) - }, metricdata.NewLabelValue(qs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue size metric: %w", err) - } - err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qs.queue.Capacity()) - }, metricdata.NewLabelValue(qs.fullName)) - if err != nil { - return fmt.Errorf("failed to create retry queue capacity metric: %w", err) - } - - return nil -} - -// shutdown is invoked during service shutdown. -func (qs *queueSender) shutdown() { - if qs.queue != nil { - // Cleanup queue metrics reporting - _ = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(0) - }, metricdata.NewLabelValue(qs.fullName)) - - // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only - // try once every request. - qs.queue.Stop() - } -} - // RetrySettings defines configuration for retrying batches in case of export failure. // The current supported strategy is exponential backoff. type RetrySettings struct { @@ -194,37 +51,6 @@ func NewDefaultRetrySettings() RetrySettings { } } -// send implements the requestSender interface -func (qs *queueSender) send(req internal.Request) error { - if qs.queue == nil { - err := qs.nextSender.send(req) - if err != nil { - qs.logger.Error( - "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", req.Count()), - ) - } - return err - } - - // Prevent cancellation and deadline to propagate to the context stored in the queue. - // The grpc/http based receivers will cancel the request context after this function returns. - req.SetContext(noCancellationContext{Context: req.Context()}) - - span := trace.SpanFromContext(req.Context()) - if !qs.queue.Produce(req) { - qs.logger.Error( - "Dropping data because sending_queue is full. Try increasing queue_size.", - zap.Int("dropped_items", req.Count()), - ) - span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) - return errSendingQueueIsFull - } - - span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) - return nil -} - // TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. type throttleRetry struct { err error @@ -373,19 +199,3 @@ func max(x, y time.Duration) time.Duration { } return x } - -type noCancellationContext struct { - context.Context -} - -func (noCancellationContext) Deadline() (deadline time.Time, ok bool) { - return -} - -func (noCancellationContext) Done() <-chan struct{} { - return nil -} - -func (noCancellationContext) Err() error { - return nil -} diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/retry_sender_test.go similarity index 50% rename from exporter/exporterhelper/queued_retry_test.go rename to exporter/exporterhelper/retry_sender_test.go index 9c67b7f1873..1da5fb29ebd 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport/obsreporttest" ) func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { @@ -113,65 +112,6 @@ func TestQueuedRetry_OnError(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) } -func TestQueuedRetry_StopWhileWaiting(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - firstMockR := newErrorRequest(context.Background()) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(firstMockR)) - }) - - // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(context.Background(), 3, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(secondMockR)) - }) - - require.LessOrEqual(t, 1, be.queueSender.(*queueSender).queue.Size()) - - assert.NoError(t, be.Shutdown(context.Background())) - - secondMockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 3) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) -} - -func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - ctx, cancelFunc := context.WithCancel(context.Background()) - cancelFunc() - mockR := newMockRequest(ctx, 2, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) - }) - ocs.awaitAsyncProcessing() - - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) -} - func TestQueuedRetry_MaxElapsedTime(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 @@ -278,178 +218,6 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } -func TestQueuedRetry_DropOnFull(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.QueueSize = 0 - rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - ocs.run(func() { - require.Error(t, be.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) - }) -} - -func TestQueuedRetryHappyPath(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - rCfg := NewDefaultRetrySettings() - set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) - for i := 0; i < wantRequests; i++ { - ocs.run(func() { - req := newMockRequest(context.Background(), 2, nil) - reqs = append(reqs, req) - require.NoError(t, be.send(req)) - }) - } - - // Wait until all batches received - ocs.awaitAsyncProcessing() - - require.Len(t, reqs, wantRequests) - for _, req := range reqs { - req.checkNumRequests(t, 1) - } - - ocs.checkSendItemsCount(t, 2*wantRequests) - ocs.checkDroppedItemsCount(t, 0) -} - -func TestQueuedRetry_QueueMetricsReported(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 0 // to make every request go straight to the queue - rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity") - for i := 0; i < 7; i++ { - require.NoError(t, be.send(newErrorRequest(context.Background()))) - } - checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size") - - assert.NoError(t, be.Shutdown(context.Background())) - checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size") -} - -func TestNoCancellationContext(t *testing.T) { - deadline := time.Now().Add(1 * time.Second) - ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) - cancelFunc() - require.Error(t, ctx.Err()) - d, ok := ctx.Deadline() - require.True(t, ok) - require.Equal(t, deadline, d) - - nctx := noCancellationContext{Context: ctx} - assert.NoError(t, nctx.Err()) - d, ok = nctx.Deadline() - assert.False(t, ok) - assert.True(t, d.IsZero()) -} - -func TestQueueSettings_Validate(t *testing.T) { - qCfg := NewDefaultQueueSettings() - assert.NoError(t, qCfg.Validate()) - - qCfg.QueueSize = 0 - assert.EqualError(t, qCfg.Validate(), "queue size must be positive") - - // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg.Enabled = false - assert.NoError(t, qCfg.Validate()) -} - -// if requeueing is enabled, we eventually retry even if we failed at first -func TestQueuedRetry_RequeuingEnabled(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - rCfg := NewDefaultRetrySettings() - rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) - ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 1) - ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here -} - -// if requeueing is enabled, but the queue is full, we get an error -func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 0 - qCfg.QueueSize = 0 - rCfg := NewDefaultRetrySettings() - rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - be.queueSender.(*queueSender).requeuingEnabled = true - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) - - require.Error(t, be.retrySender.send(mockR), "sending_queue is full") - mockR.checkNumRequests(t, 1) -} - -func TestQueueRetryWithDisabledQueue(t *testing.T) { - qs := NewDefaultQueueSettings() - qs.Enabled = false - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.Nil(t, be.queueSender.(*queueSender).queue) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(context.Background(), 2, errors.New("some error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.Error(t, be.send(mockR)) - }) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) -} - func TestQueueRetryWithNoQueue(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // fail fast @@ -469,107 +237,6 @@ func TestQueueRetryWithNoQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } -func TestQueuedRetryPersistenceEnabled(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - storageID := component.NewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := NewDefaultRetrySettings() - set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(nil), - } - host := &mockHost{ext: extensions} - - // we start correctly with a file storage extension - require.NoError(t, be.Start(context.Background(), host)) - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { - storageError := errors.New("could not get storage client") - tt, err := obsreporttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - storageID := component.NewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := NewDefaultRetrySettings() - set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - var extensions = map[component.ID]component.Component{ - storageID: internal.NewMockStorageExtension(storageError), - } - host := &mockHost{ext: extensions} - - // we fail to start if we get an error creating the storage client - require.Error(t, be.Start(context.Background(), host), "could not get storage client") -} - -func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { - - produceCounter := &atomic.Uint32{} - - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 1 - rCfg := NewDefaultRetrySettings() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - - req := newMockRequest(context.Background(), 3, errors.New("some error")) - - be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), &mockHost{})) - - // wraps original queue so we can count operations - be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ - ProducerConsumerQueue: be.queueSender.(*queueSender).queue, - produceCounter: produceCounter, - } - be.queueSender.(*queueSender).requeuingEnabled = true - - // replace nextSender inside retrySender to always return error so it doesn't exit send loop - be.retrySender.setNextSender(&errorRequestSender{ - errToReturn: errors.New("some error"), - }) - - // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.send(req)) - - // first wait for the item to be produced to the queue initially - assert.Eventually(t, func() bool { - return produceCounter.Load() == uint32(1) - }, time.Second, 1*time.Millisecond) - - // shuts down and ensure the item is produced in the queue again - require.NoError(t, be.Shutdown(context.Background())) - assert.Eventually(t, func() bool { - return produceCounter.Load() == uint32(2) - }, time.Second, 1*time.Millisecond) -} - -func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, - WithRetry(NewDefaultRetrySettings())) - assert.Nil(t, err) - assert.True(t, bs.requestExporter) - assert.Panics(t, func() { - _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, - WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) - }) -} - type mockErrorRequest struct { baseRequest } @@ -728,15 +395,6 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met return true } -type mockHost struct { - component.Host - ext map[component.ID]component.Component -} - -func (nh *mockHost) GetExtensions() map[component.ID]component.Component { - return nh.ext -} - type producerConsumerQueueWithCounter struct { internal.ProducerConsumerQueue produceCounter *atomic.Uint32 diff --git a/exporter/exporterhelper/timeout_sender.go b/exporter/exporterhelper/timeout_sender.go new file mode 100644 index 00000000000..11b85cf08be --- /dev/null +++ b/exporter/exporterhelper/timeout_sender.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. +type TimeoutSettings struct { + // Timeout is the timeout for every attempt to send data to the backend. + Timeout time.Duration `mapstructure:"timeout"` +} + +// NewDefaultTimeoutSettings returns the default settings for TimeoutSettings. +func NewDefaultTimeoutSettings() TimeoutSettings { + return TimeoutSettings{ + Timeout: 5 * time.Second, + } +} + +// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. +type timeoutSender struct { + baseRequestSender + cfg TimeoutSettings +} + +func (ts *timeoutSender) send(req internal.Request) error { + // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be + // updated because this deadline most likely is before the next one. + ctx := req.Context() + if ts.cfg.Timeout > 0 { + var cancelFunc func() + ctx, cancelFunc = context.WithTimeout(req.Context(), ts.cfg.Timeout) + defer cancelFunc() + } + return req.Export(ctx) +}